In [1]:
import pandas as pd
import numpy as np
import random
import quixstreams as qx

In [2]:
# ORIGINAL

import ctypes
import math
from datetime import datetime
from typing import Dict, List, Optional

import pandas as pd

from quixstreams import TimeseriesDataRaw


def from_dataframe(data_frame: pd.DataFrame, epoch: int = 0) -> 'TimeseriesDataRaw':
    """
    Converts from pandas DataFrame to TimeseriesDataRaw
    :param data_frame: The pandas DataFrame to convert to TimeseriesData
    :param epoch: The epoch to add to each time value when converting to TimeseriesData. Defaults to 0
    :return: Converted TimeseriesData
    """

    if data_frame is None:
        return None

    parameter_data_raw = TimeseriesDataRaw()

    possible_time_labels = set(['time', 'timestamp', 'datetime'])

    # first or default in columns
    time_label = next((x for x in data_frame.columns if x.lower() in possible_time_labels), None)

    if time_label is None:
        possible_time_vals = data_frame.select_dtypes(include=['int', 'int64'])
        if possible_time_vals.count()[0] == 0:
            raise Exception(
                "pandas DataFrame does not contain a suitable time column. Make sure to label the column 'time' or 'timestamp', else first integer column will be picked up as time")
        time_label = possible_time_vals.columns[0]

    def get_value_as_type(val_type, val, type_conv=None) -> []:
        if not isinstance(val, val_type) and type_conv is not None:
            new_val = type_conv(val)
            return new_val
        return val

    def convert_time(val):
        if isinstance(val, int):
            return val
        if isinstance(val, datetime):
            if isinstance(val, pd.Timestamp):
                val = val.to_pydatetime()
            return TimeConverter.to_unix_nanoseconds(val)
        if isinstance(val, str):
            return TimeConverter.from_string(val)
        return int(val)

    def str_type_converter(val_to_convert):
        if val_to_convert is None:
            return None
        return str(val_to_convert)

    def bytes_type_converter(val_to_convert):
        if val_to_convert is None:
            return None
        return bytes(val_to_convert)

    def double_type_converter(val_to_convert):
        if val_to_convert is None:
            return None
        if pd.isnull(val_to_convert):
            return None
        return float(val_to_convert)

    rows_no = len(data_frame.index)  # dataframe rows count
    timestamps = [None] * rows_no
    string_values = {}
    binary_values = {}
    numeric_values = {}
    tag_values = {}

    def _add_tag(tag_colname: str, index: int, value: str):
        if tag_colname not in tag_values:
            tag_values[tag_colname] = [None] * rows_no
        tag_values[tag_colname][index] = value

    def _add_numeric(tag_colname: str, index: int, value: float):
        if tag_colname not in numeric_values:
            numeric_values[tag_colname] = [None] * rows_no
        numeric_values[tag_colname][index] = value

    def _add_string(tag_colname: str, index: int, value: str):
        if tag_colname not in string_values:
            string_values[tag_colname] = [None] * rows_no
        string_values[tag_colname][index] = value

    def _add_binary(tag_colname: str, index: int, value: bytes):
        if tag_colname not in binary_values:
            binary_values[tag_colname] = [None] * rows_no
        binary_values[tag_colname][index] = value

    row_index = -1
    for panda_index, panda_row in data_frame.iterrows():
        row_index = row_index + 1
        time = get_value_as_type(int, panda_row[time_label], convert_time)
        time = time - epoch
        timestamps[row_index] = time

        for panda_col_label, content in panda_row.items():
            panda_col_label = str(panda_col_label)  # just in case it is a numeric header
            if panda_col_label == time_label:
                continue
            label_type = type(content)
            isnumeric = (label_type == int or label_type == float or label_type == complex)
            if isnumeric:
                if math.isnan(content):
                    continue  # ignore it, as panda uses NaN instead of None, unable to detect difference
                if panda_col_label.startswith('TAG__'):  # in case user of lib didn't put it in quote don't throw err
                    str_val = get_value_as_type(str, content, str_type_converter)
                    _add_tag(panda_col_label[5:], row_index, str_val)
                else:
                    num_val = get_value_as_type(float, content, double_type_converter)
                    _add_numeric(panda_col_label, row_index, num_val)
            else:
                isbytes = (label_type == bytes or label_type == bytearray)
                if isbytes:
                    if panda_col_label.startswith('TAG__'):
                        str_val = get_value_as_type(str, content, str_type_converter)
                        _add_tag(panda_col_label[5:], row_index, str_val)
                    else:
                        bytes_val = get_value_as_type(bytes, content, bytes_type_converter)
                        _add_binary(panda_col_label, row_index, bytes_val)
                else:
                    str_val = get_value_as_type(str, content, str_type_converter)
                    if panda_col_label.startswith('TAG__'):
                        _add_tag(panda_col_label[5:], row_index, str_val)
                    else:
                        _add_string(panda_col_label, row_index, str_val)

    parameter_data_raw.set_values(
        epoch=epoch,
        timestamps=timestamps,
        numeric_values=numeric_values,
        string_values=string_values,
        tag_values=tag_values,
        binary_values=binary_values
    )

    return parameter_data_raw

In [3]:
# ORIGINAL

import ctypes
import math
from datetime import datetime
from typing import Dict, List, Optional

import pandas as pd

from quixstreams import TimeseriesDataRaw


def from_dataframe_new(data_frame: pd.DataFrame, epoch: int = 0) -> 'TimeseriesDataRaw':
    """
    Converts from pandas DataFrame to TimeseriesDataRaw
    :param data_frame: The pandas DataFrame to convert to TimeseriesData
    :param epoch: The epoch to add to each time value when converting to TimeseriesData. Defaults to 0
    :return: Converted TimeseriesData
    """

    if data_frame is None:
        return None

    # find time label: first or default in columns
    possible_time_labels = set(['time', 'timestamp', 'datetime'])
    time_label = next((x for x in data_frame.columns if x.lower() in possible_time_labels), None)
    if time_label is None:
        possible_time_vals = data_frame.select_dtypes(include=['int', 'int64'])
        if possible_time_vals.count()[0] == 0:
            raise Exception(
                "pandas DataFrame does not contain a suitable time column. Make sure to label the column 'time' or 'timestamp', else first integer column will be picked up as time")
        time_label = possible_time_vals.columns[0]

    def get_value_as_type(val_type, val, type_conv=None) -> []:
        if not isinstance(val, val_type) and type_conv is not None:
            new_val = type_conv(val)
            return new_val
        return val

    def convert_time(val):
        if isinstance(val, int):
            return val
        if isinstance(val, datetime):
            if isinstance(val, pd.Timestamp):
                val = val.to_pydatetime()
            return TimeConverter.to_unix_nanoseconds(val)
        if isinstance(val, str):
            return TimeConverter.from_string(val)
        return int(val)
    
    def get_type_mode(data_frame: pd.DataFrame, col_i: str):
        """
        if len(data_frame) <= 5:
            sample_n = 1
        elif len(data_frame) <= 50:
            sample_n = 5
        else:
            sample_n = 9
        type_samples_col_i = [type(sample_i) for sample_i in np.random.choice(data_frame[col_i], sample_n, replace=False)]
        type_mode_col_i = max(set(type_samples_col_i), key=type_samples_col_i.count)

        """
        
        return type(data_frame[col_i].iloc[0])

    def get_labels_by_type(data_frame: pd.DataFrame):
        int_labels = []
        float_labels = []
        complex_labels = []
        string_labels = []
        bytes_labels = []
        for label_i in data_frame.columns:
            label_i_type = get_type_mode(data_frame, label_i)
            if label_i_type == int:
                int_labels.append(label_i)
            elif label_i_type == float:
                float_labels.append(label_i)
            elif label_i_type == complex:
                complex_labels.append(label_i)
            elif label_i_type == bytes:
                bytes_labels.append(label_i)
            else:
                string_labels.append(label_i)
        return int_labels, float_labels, complex_labels, string_labels, bytes_labels

        
    data_frame = data_frame.dropna(subset=time_label) # Ensure rows with nulls int time column are dropped
    # data_frame = data_frame.reset_index(drop=True) # Ensure correct indexing
    
    # Get timestamps array
    timestamps = [get_value_as_type(int, ti, convert_time) - epoch for ti in data_frame[time_label]]
    
    # Get TAGS dictionary
    data_frame_labels = [label_i for label_i in data_frame.columns if label_i != time_label]
    tag_labels = [label_i for label_i in data_frame_labels if label_i.startswith('TAG__')]
    data_frame_nulls = data_frame[tag_labels].isna()
    tag_values_dic = data_frame[tag_labels].astype(str).mask(data_frame_nulls, None).to_dict(orient="list")


    # Get columns types
    data_frame_labels = [label_i for label_i in data_frame_labels if label_i not in tag_labels]
    int_labels, float_labels, complex_labels, string_labels, bytes_labels = get_labels_by_type(data_frame[data_frame_labels])

    # Get numeric values
    int_values_dic = data_frame[int_labels].astype(int, errors="ignore").mask(data_frame[int_labels].isna(), None).to_dict(orient="list")
    float_values_dic = data_frame[float_labels].astype(float, errors="ignore").mask(data_frame[float_labels].isna(), None).to_dict(orient="list")
    complex_values_dic = data_frame[complex_labels].astype(complex, errors="ignore").mask(data_frame[complex_labels].isna(), None).to_dict(orient="list")
    numeric_values_dic = int_values_dic
    numeric_values_dic.update(float_values_dic)
    numeric_values_dic.update(complex_values_dic)
    
    # Get string values
    data_frame_nulls = data_frame[string_labels].isna()
    string_values_dic = data_frame[string_labels].astype(str, errors="ignore").mask(data_frame_nulls, None).to_dict(orient="list")

    # Get bytes values
    data_frame_nulls = data_frame[bytes_labels].isna()
    binary_values_dic = data_frame[bytes_labels].astype(bytes, errors="ignore").mask(data_frame_nulls, None).to_dict(orient="list") 

    # Create TimeseriesDataRaw object

    parameter_data_raw = TimeseriesDataRaw()    
    parameter_data_raw.set_values(
        epoch=epoch,
        timestamps=timestamps,
        numeric_values=numeric_values_dic,
        string_values=string_values_dic,
        tag_values=tag_values_dic,
        binary_values=binary_values_dic
    )

    return parameter_data_raw


In [4]:
import string
import uuid
import random
import os

def initiate_dfi(int_cols, float_cols, complex_cols, string_cols, bytes_cols):
    df = pd.DataFrame({"Timestamp": [int((pd.Timestamp.now() - pd.Timestamp("1970-01-01")) / pd.Timedelta('1s') * 1000000000)]})
    for col in int_cols:
        df[col] = random.randint(-10**9, 10**9)
    for col in float_cols:
        df[col] = random.uniform(-10**9, 10**9)
    for col in complex_cols:
        df[col] = np.random.uniform(-1, 1) + 1.j * np.random.uniform(-1, 1)
            
    for col in string_cols:
        df[col] = uuid.uuid4()
    for col in bytes_cols:
        df[col] = os.urandom(16)
    return df

def add_randonm_nulls(df: pd.DataFrame, perc_nulls: float):
    ix = [(row, col) for row in range(df.shape[0]) for col in range(1, df.shape[1])]
    for row, col in random.sample(ix, int(round(perc_nulls*len(ix)))):
        df.iat[row, col] = np.nan
    return df

def initiate_df(rows: int, number_numeric_cols:int, number_string_cols: int, number_bytes_cols: int):
    df = pd.DataFrame()
    alphabet_list = list(string.ascii_lowercase.upper())
    
    int_number = random.randint(1, number_numeric_cols)
    float_number = random.randint(1, number_numeric_cols-int_number)
    #float_number = number_numeric_cols-int_number
    complex_number = number_numeric_cols-int_number-float_number
    int_cols = ["Param "+i for i in alphabet_list[:int_number]]
    float_cols = ["Param "+i for i in alphabet_list[int_number:int_number+float_number]]
    #float_cols = ["Param "+i for i in alphabet_list[int_number:number_numeric_cols]]
    complex_cols = ["Param "+i for i in alphabet_list[int_number+float_number:number_numeric_cols]]
    #complex_cols = []
    string_cols = ["Param "+i for i in alphabet_list[number_numeric_cols:number_numeric_cols+number_string_cols]]
    bytes_cols = ["Param "+i for i in alphabet_list[number_numeric_cols+number_string_cols:number_numeric_cols+number_string_cols+number_bytes_cols]]
    for i in range(rows):
        df_i = initiate_dfi(int_cols, float_cols, complex_cols, string_cols, bytes_cols)
        df = pd.concat([df, df_i], axis=0)
    df = df.reset_index(drop=True)
    df = add_randonm_nulls(df, 0.01)
    return df

In [19]:
import time


df_measurements = pd.DataFrame()

for df_len in [1, 5, 10, 50, 100, 500, 1000, 10000, 100000]:
    
    df = initiate_df(rows = df_len, 
                     number_numeric_cols = 10, 
                     number_string_cols = 5, 
                     number_bytes_cols = 3)
    print("Rows", df_len)
    
    time_old_i = time.perf_counter()
    from_dataframe(df, 0)
    time_old_j = time.perf_counter()
    delta_old = time_old_j - time_old_i
    
    time_new_i = time.perf_counter()
    from_dataframe_new(df, 0)
    time_new_j = time.perf_counter()
    delta_new = time_new_j - time_new_i
    
    df_measurements = pd.concat([df_measurements, 
                                pd.DataFrame({"Rows": [df_len], "OLD": [delta_old], "NEW": [delta_new], "NewVSOld": [delta_new/delta_old]})], 
                               axis = 0)
    
df_measurements    

Rows 1
Rows 5
Rows 10
Rows 50
Rows 100
Rows 500
Rows 1000
Rows 10000
Rows 100000


Unnamed: 0,Rows,OLD,NEW,NewVSOld
0,1,0.001272,0.007987,6.280075
0,5,0.001354,0.00603,4.453397
0,10,0.002015,0.006123,3.03941
0,50,0.008088,0.011686,1.44473
0,100,0.011125,0.013225,1.188778
0,500,0.06266,0.044006,0.702294
0,1000,0.10202,0.089015,0.872524
0,10000,1.343007,0.984935,0.73338
0,100000,13.4761,10.693345,0.793504


In [20]:
df_measurements = pd.DataFrame()

for df_len in [1, 5, 10, 50, 100, 500, 1000, 10000, 100000]:
    
    df = initiate_df(rows = df_len, 
                     number_numeric_cols = 2, 
                     number_string_cols = 1, 
                     number_bytes_cols = 0)
    print("Rows", df_len)
    
    time_old_i = time.perf_counter()
    from_dataframe(df, 0)
    time_old_j = time.perf_counter()
    delta_old = time_old_j - time_old_i
    
    time_new_i = time.perf_counter()
    from_dataframe_new(df, 0)
    time_new_j = time.perf_counter()
    delta_new = time_new_j - time_new_i
    
    df_measurements = pd.concat([df_measurements, 
                                pd.DataFrame({"Rows": [df_len], "OLD": [delta_old], "NEW": [delta_new], "NewVSOld": [delta_new/delta_old]})], 
                               axis = 0)
    
df_measurements   

Rows 1
Rows 5
Rows 10
Rows 50
Rows 100
Rows 500
Rows 1000
Rows 10000
Rows 100000


Unnamed: 0,Rows,OLD,NEW,NewVSOld
0,1,0.001759,0.006794,3.862804
0,5,0.000886,0.005327,6.013207
0,10,0.001196,0.006005,5.022246
0,50,0.00327,0.006045,1.848337
0,100,0.005131,0.006774,1.320081
0,500,0.020028,0.009946,0.496605
0,1000,0.041342,0.016055,0.388336
0,10000,0.413914,0.125784,0.30389
0,100000,4.787079,1.772703,0.37031


In [5]:
df = initiate_df(100, 10, 5, 3)
df.head()

Unnamed: 0,Timestamp,Param A,Param B,Param C,Param D,Param E,Param F,Param G,Param H,Param I,Param J,Param K,Param L,Param M,Param N,Param O,Param P,Param Q,Param R
0,1678365918195919104,-982268394,500895087.0,48567155,871424618.0,-21949923.0,990199804,954054589,-473656900.0,25380450.0,0.700805-0.868499j,03105911-9897-4d90-a9e8-c31426ee4a81,e38bf161-2ad9-48eb-ac2b-7b06cca6fede,1001e735-489b-4dfc-bc46-59667059d6a1,509087fe-4d89-4567-a119-c3c3ff2edf08,a2fdf259-be05-4201-bdbd-1566d60d1418,b'\xbdg\xb3\xd4_\x12U\xa8\x98*UW\xc9\x83\x0c\xfc',b'\xbc\x16\x0f\x82/\x1d\n\xa7\x91uU\xe0\xb4\xe...,b'\xd3\x00_\xbc\xa0\x8cW\n\x8e`\x112uC*?'
1,1678365918208485888,602595364,611598063.0,-255298595,-668802611.0,-910458320.0,-82451559,-93032102,125081700.0,-600317900.0,-0.384733+0.927890j,79df4b0d-c35b-445a-838f-4533199681e0,03177dbc-9829-4730-9be6-6a2b334278df,43b1d4fe-4be6-4440-a63b-5d07ce823087,223df8e4-a6d2-4557-9066-61d2ebb73e19,588a60ba-40c8-434d-9483-77463ab519a5,b'\xd2\x88x9|\xff&\x02<\xa0\x86G\xe3N \x1f',b'\xea\x00\xd0\xbd\xfc\xf5\xb3\xa3\x9b\xf7\xa8...,b'\x8a\xd1\rT4}\xf2s\x10\x07LJ\xe1\xc0)\x85'
2,1678365918213491968,-613191481,347661530.0,-279967642,-879621713.0,,-242683535,187353201,-693494300.0,250605000.0,0.238096+0.652271j,a9e45a1b-e888-47c6-8e3d-6157f1b42870,9e620504-5534-4c97-9170-0ecf42d2b605,1360727d-0fc5-4b47-bb37-cd3a525da0d0,0e3e98b5-936a-42ec-a669-efcc89a5b914,9eb95011-cbb9-4598-a843-1710c9d220b9,b'\xc4\x1c\x95\x13\x0f\xfcg\x11\x15\xd4\x7f\xa...,b':\xea\xb7\x95G\x02R\xb2\xc9\x8d \xf9g@\x1c\xf8',b'\xf1w\xeaD\x90~?\xc1\xd8\xe2\x9a\x01\x9a\xf4...
3,1678365918220315904,-83997683,-687141850.0,845323272,-576681934.0,383235381.0,-703568268,138659402,-183324100.0,-484481400.0,0.050204-0.717171j,a12731d9-19ad-49be-9880-7c6bac47c3b0,2c8fa710-c959-4e3e-acea-b3d82040f1c7,ad5ac67e-ab81-4a32-a998-2933bace65b2,e4972cf9-5416-4587-beac-c5ed80c40252,f2453198-e2f2-44be-96d6-73d4afb4ad39,b';e\xbb\xbftD\x17[P\x7f\xff\xa6~\x0f\x1e\x92',b'\xfa\xa5\x1e\xa2\x9e\xa3\x83Sb\x93\x14\x97{w...,b'\xbd\xfc\xba\x1cP;U\xba0\xa2\x00&/\x80E\xcc'
4,1678365918227960064,354873501,-470013432.0,692473667,-899762912.0,578392405.0,279750804,-135703279,-277879900.0,422351000.0,-0.960423+0.576549j,,e2aa8d4a-a73e-4cc7-b2c0-4e74c0d77f66,eaf1f5c8-d867-44b2-a12c-79e379f17c9a,225cfc23-477d-4227-86c3-ed0f9226ecd1,8c8addfc-d510-4e6b-aeb6-b8f42ca70966,b'\xf2g\xdb\x1b\xbc\xca w\xc27\xfbW\x82\xd4.\xc7',b'\xc1o\x1fv\x11\xaa\n\xb5\xc9\xcc\x95\x0cN\xb...,"b""\xdd\x19'\xafA\xb6W {]F\r\x03\xb1\xfb\xfc"""


In [36]:
df.dtypes

Timestamp         int64
Param A         float64
Param B           int64
Param C           int64
Param D           int64
Param E         float64
Param F         float64
Param G         float64
Param H         float64
Param I      complex128
Param J      complex128
Param K          object
Param L          object
Param M          object
Param N          object
Param O          object
Param P          object
Param Q          object
Param R          object
dtype: object

In [37]:
%%time 
from_dataframe(df, 0)

TypeError: can't convert complex to float

In [483]:
%%time 
from_dataframe_new(df, 0)

Wall time: 37.4 ms


<quixstreams.models.timeseriesdataraw.TimeseriesDataRaw at 0x1f2bff7e4c0>

In [484]:
df_2 = initiate_df(10, 10, 5, 3)

6
4


In [485]:
%%time 
from_dataframe(df_2, 0)

Wall time: 3 ms


<quixstreams.models.timeseriesdataraw.TimeseriesDataRaw at 0x1f2bfe4cdf0>

In [486]:
%%time 
from_dataframe_new(df_2, 0)

Wall time: 16 ms


<quixstreams.models.timeseriesdataraw.TimeseriesDataRaw at 0x1f2bff24a30>

In [487]:
df_3 = initiate_df(1, 10, 5, 3)

8
2


In [488]:
%%time 
from_dataframe(df_3, 0)

Wall time: 2 ms


<quixstreams.models.timeseriesdataraw.TimeseriesDataRaw at 0x1f2bfe4b820>

In [489]:
%%time 
from_dataframe_new(df_3, 0)

Wall time: 14.3 ms


<quixstreams.models.timeseriesdataraw.TimeseriesDataRaw at 0x1f2bff7e1f0>