In [1]:
import pyreadr
import numpy as np
import pandas as pd
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
import ast

import os
import pyarrow.parquet as pq

In [2]:
from datetime import datetime
from pytz import timezone
from sklearn.utils import Bunch

# Read in the complete data
acc_data_trim = pd.read_parquet('data/acc_v0.parquet')  # Replace with the actual file path

# Convert timestamp to local time
#acc_data['timestamp'] = pd.to_datetime(acc_data['timestamp'], format='%Y-%m-%d %H:%M:%S', utc=True)
#acc_data['local_timestamp'] = acc_data['timestamp'].dt.tz_convert('Africa/Nairobi')

# Subset data
#try:
#    acc_data_trim = acc_data[['individual_local_identifier', 'local_timestamp', 
#                              'eobs_accelerations_raw', 'eobs_acceleration_sampling_frequency_per_axis']]
#except KeyError as e:
#    print(f"KeyError: {e}")


In [4]:

# Join with metadata
metadata = pd.read_csv('data/metadata.csv')  # Replace with the actual file path
acc_data_trim = acc_data_trim.merge(
    metadata[['individual_local_identifier', 'tag_local_identifier', 'group_id']],
    on='individual_local_identifier',
    how='left'
)


In [5]:
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
import numpy as np

# Function to process a chunk of data
def process_chunk(chunk):
    return chunk['eobs_accelerations_raw'].str.split(' ', expand=True).astype(float)

# Split the DataFrame into chunks
num_chunks = 20  # Adjust based on the number of CPU cores
chunks = np.array_split(acc_data_trim, num_chunks)

# Process chunks in parallel
with ProcessPoolExecutor(max_workers=num_chunks) as executor:
    results = list(executor.map(process_chunk, chunks))

# Combine the results back into a single DataFrame
d2 = pd.concat(results, ignore_index=True)

  return bound(*args, **kwds)


In [None]:

# Split eobs_accelerations_raw into separate columns
d2 = acc_data_trim['eobs_accelerations_raw'].str.split(' ', expand=True).astype(float)


In [16]:

# Rename columns
num_cols = d2.shape[1]
col_names = [f"{axis}{i+1}" for i in range(num_cols // 3) for axis in ['x', 'y', 'z']]
#d2.columns = col_names

# Add timestamp and tag columns
d2['local_timestamp'] = acc_data_trim['local_timestamp']
d2['tag'] = acc_data_trim['tag_local_identifier_x']

# Remove rows with missing values
inds = d2.dropna().index
acc_data_trim = acc_data_trim.loc[inds]
d2 = d2.loc[inds]

# Split into x, y, z components
x_d = d2.filter(like='x')
y_d = d2.filter(like='y')
z_d = d2.filter(like='z')

# Read calibration data
acc_calib = pd.read_csv('data/acc_calib.csv')  # Replace with the actual file path
acc_calib['tag'] = acc_calib['tag'].astype(str)
acc_calib[['x0', 'y0', 'z0', 'Sx', 'Sy', 'Sz']] = acc_calib[['x0', 'y0', 'z0', 'Sx', 'Sy', 'Sz']].astype(float)


In [None]:

# Match indices between acc_calib and acc_data_trim
ind_id = acc_data_trim['tag_local_identifier_x'].map(acc_calib.set_index('tag').index)

# Calculate calibrated accelerations
acc_data_trim['x_cal'] = (x_d.values - acc_calib.loc[ind_id, 'x0'].values[:, None]) * acc_calib.loc[ind_id, 'Sx'].values[:, None] * 9.81
acc_data_trim['y_cal'] = (y_d.values - acc_calib.loc[ind_id, 'y0'].values[:, None]) * acc_calib.loc[ind_id, 'Sy'].values[:, None] * -9.81
acc_data_trim['z_cal'] = (z_d.values - acc_calib.loc[ind_id, 'z0'].values[:, None]) * acc_calib.loc[ind_id, 'Sz'].values[:, None] * 9.81

In [11]:
# ---- Helper function: dy_acc ----
def dy_acc(vect, win_size=7):
    """
    Calculate the dynamic acceleration (dy_acc) for a given vector.
    """
    if vect is None or len(vect) == 0:
        raise ValueError("Input vector is empty or invalid.")
    
    pad_size = int(win_size / 2 - 0.5)
    padded = np.pad(vect, (pad_size, pad_size), constant_values=np.nan)
    acc_vec = np.empty(len(vect))
    acc_vec[:] = np.nan

    for i in range(len(vect)):
        window = padded[i : i + (2 * pad_size + 1)]
        m_ave = np.nanmean(window)
        acc_vec[i] = vect[i] - m_ave
    
    return acc_vec

def process_row(row):
    """
    Process a single row to calculate dynamic acceleration components and derived metrics.
    """
    x_component = np.abs(dy_acc(row['x_cal_array']))
    y_component = np.abs(dy_acc(row['y_cal_array']))
    z_component = np.abs(dy_acc(row['z_cal_array']))

    vectorial_sum = np.sqrt(x_component**2 + y_component**2 + z_component**2)
    ave_vedba_value = np.nanmean(vectorial_sum)

    pitch = np.arctan2(x_component, np.sqrt(y_component**2 + z_component**2))
    ave_pitch = np.nanmean(pitch)

    return ave_vedba_value, ave_pitch


In [2]:
# ---- Main Processing ----
# Load the data
acc_data_trim = pd.read_parquet('data/acc_v1.parquet')
acc_data_trim = acc_data_trim.dropna()


In [26]:
acc_data_trim['x_cal_array'] = acc_data_trim['x_cal'].apply(
    lambda d: np.array(list(d.values())) if isinstance(d, dict) else np.nan
)
acc_data_trim['y_cal_array'] = acc_data_trim['y_cal'].apply(
    lambda d: np.array(list(d.values())) if isinstance(d, dict) else np.nan
)
acc_data_trim['z_cal_array'] = acc_data_trim['z_cal'].apply(
    lambda d: np.array(list(d.values())) if isinstance(d, dict) else np.nan
)


# Check for NaN or non-finite values in x_cal_array, y_cal_array, z_cal_array
invalid_rows = acc_data_trim[
    acc_data_trim['x_cal_array'].apply(lambda x: not isinstance(x, np.ndarray) or not np.isfinite(x).all()) |
    acc_data_trim['y_cal_array'].apply(lambda x: not isinstance(x, np.ndarray) or not np.isfinite(x).all()) |
    acc_data_trim['z_cal_array'].apply(lambda x: not isinstance(x, np.ndarray) or not np.isfinite(x).all())
]

# Print the invalid rows
print(f"Number of invalid rows: {len(invalid_rows)}")
print(invalid_rows)

TypeError: ufunc 'isfinite' not supported for the input types, and the inputs could not be safely coerced to any supported types according to the casting rule ''safe''

In [None]:
# with joblib
# # from joblib import Parallel, delayed

# worker_count = 8
# results = Parallel(n_jobs=worker_count)(delayed(process_row)(row) for _, row in acc_data_trim[:200000].iterrows())

# with chunks
# results = []
# chunk_size = 100000  # Adjust the chunk size as needed
# data_chunks = list(chunk_data(acc_data_trim.iterrows(), chunk_size))

# with ProcessPoolExecutor(max_workers=16) as executor:
#     futures = list(tqdm(executor.map(process_chunk, data_chunks), total=len(data_chunks), desc="Collecting results"))
#     for future in futures:
#         results.extend(future)

results = []
with ProcessPoolExecutor(max_workers=8) as executor:
    futures = {executor.submit(process_row, row): i for i, row in acc_data_trim.iterrows()}
    for f in futures:
        results.append(f.result())


In [2]:
######################################
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
from concurrent.futures import ProcessPoolExecutor

# Function to process a chunk of data
def process_chunk(chunk):
    # Convert 'x_cal', 'y_cal', 'z_cal' dict columns to numpy arrays
    chunk['x_cal_array'] = chunk['x_cal'].apply(
        lambda d: np.array(list(d.values())) if isinstance(d, dict) else np.nan
    )
    chunk['y_cal_array'] = chunk['y_cal'].apply(
        lambda d: np.array(list(d.values())) if isinstance(d, dict) else np.nan
    )
    chunk['z_cal_array'] = chunk['z_cal'].apply(
        lambda d: np.array(list(d.values())) if isinstance(d, dict) else np.nan
    )
    # Process each row in the chunk
    results = [process_row(row) for _, row in chunk.iterrows()]
    ave_vedba, ave_pitch = zip(*results)
    chunk['ave_vedba'] = ave_vedba
    chunk['ave_pitch'] = ave_pitch
    return chunk

# Read the Parquet file in chunks
file_path = 'data/acc_v1.parquet'  # Replace with your file path
parquet_file = pq.ParquetFile(file_path)

# Process the file in chunks
chunk_size = 10000  # Define the number of rows per chunk
results = []

with ProcessPoolExecutor(max_workers=40) as executor:
    futures = []
    for i in range(0, parquet_file.num_row_groups):
        # Read a chunk (row group)
        table = parquet_file.read_row_group(i)
        chunk = table.to_pandas()
        # Submit the chunk for parallel processing
        futures.append(executor.submit(process_chunk, chunk))
    
    # Collect results as they complete
    for future in futures:
        results.append(future.result())

# Combine all processed chunks into a single DataFrame
acc_data_trim = pd.concat(results, ignore_index=True)

# Save the processed data
#acc_data_trim.to_parquet('data/processed_acc_v1.parquet', index=False)
#print("Data saved to 'data/processed_acc_v1.parquet'")
######################################

NameError: name 'process_row' is not defined

In [None]:
# Store back the results
ave_vedba, ave_pitch = zip(*results)
acc_data_trim['ave_vedba'] = ave_vedba
acc_data_trim['ave_pitch'] = ave_pitch

# Log-transform like in R
acc_data_trim['log_vedba'] = np.log(acc_data_trim['ave_vedba'])

# Select relevant columns
filtered_data = acc_data_trim[['individual_local_identifier', 
                               'local_timestamp', 
                               'tag_local_identifier', 
                               'log_vedba', 
                               'ave_pitch']]

# Save the processed data
filtered_data.to_parquet('data/acc_vedba.parquet', index=False)
print("Data saved to 'data/acc_vedba.parquet'")

Data saved to 'data/acc_vedba.parquet'
