<h1> Imports

In [None]:
""" REQUIRES PISON CLOUD BINDINGS REPO IN LOCAL TREE WITH CODE IN WORKSPACE 
-- CODE SHOULD BE ADDED TO CHECK AGILITY AND FOCUS SESSIONS WHEN SESSION IDs GET ADDED AND USE FUTURE BULK DEVICE CONVERTER"""

In [None]:
import pandas as pd
import json
import pandas_gbq
import google.auth
import re
import numpy as np
import datetime as dt
import grpc
import logging
import subprocess
import google.auth.transport.requests
import google.oauth2.id_token
from abc import ABC
from tqdm import tqdm
from scipy.signal import *
from datetime import datetime
from google.cloud import bigquery
from scipy import signal
from typing import List, Tuple, Union
from scipy.signal import butter, iirnotch, filtfilt
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.json_format import MessageToDict
from google.auth import default

#dag
from pison_ml.processors.filtering import EMGButterNotchFilter
from pison_ml.processors.filtering import EMGButterNotchFilterWide

#proto
import pison_cloud_bindings
from pison_cloud_bindings.src import pison_cloud
from pison_cloud.pison.common.cloud.v1.common_pb2 import ListQueryParameters
from pison_cloud.pison.common.cloud.v1.common_pb2 import DateRange

from pison_cloud.pison.session.cloud.v1 import session_pb2_grpc, session_pb2
from pison_cloud.pison.session.cloud.v1.session_pb2 import ReadSessionRequest

from pison_cloud.pison.authorization.cloud.v1.authorization_pb2_grpc import AuthorizationManagementServiceStub
from pison_cloud.pison.authorization.cloud.v1 import authorization_pb2

from pison_cloud.pison.readiness.cloud.v1.readiness_pb2_grpc import ReadinessServiceStub
from pison_cloud.pison.readiness.cloud.v1 import readiness_pb2 
from pison_cloud.pison.readiness.cloud.v1.readiness_pb2 import Readiness as ReadinessScoreProto, \
    ListReadinessRequest as ListReadinessScoreRequest, ListSortParams as ReadinessSortParams, ListPaginationParams as ReadinessPaginationParams

from pison_cloud.pison.agility_score.cloud.v1.agility_score_pb2_grpc import AgilityScoreServiceStub
from pison_cloud.pison.agility_score.cloud.v1 import agility_score_pb2 
from pison_cloud.pison.agility_score.cloud.v1.agility_score_pb2 import AgilityScore as AgilityScoreProto, \
    ListAgilityScoreRequest, ListSortParams as AgilitySortParams, ListPaginationParams as AgilityPaginationParams

from pison_cloud.pison.focus_score.cloud.v1.focus_score_pb2_grpc import FocusScoreServiceStub
from pison_cloud.pison.focus_score.cloud.v1 import focus_score_pb2_grpc, focus_score_pb2
from pison_cloud.pison.focus_score.cloud.v1.focus_score_pb2 import FocusScore as FocusScoreProto, \
    ListFocusScoreRequest, ListSortParams as FocusSortParams, ListPaginationParams as FocusPaginationParams

from pison_cloud.pison.device.cloud.v1.device_pb2_grpc import DeviceServiceStub
from pison_cloud.pison.device.cloud.v1 import device_pb2, device_pb2_grpc

from pison_cloud.pison.protocol.cloud.v1.protocol_pb2_grpc import ProtocolServiceStub
from pison_cloud.pison.protocol.cloud.v1 import protocol_pb2, protocol_pb2_grpc

<h1> Classes and Functions

In [None]:
class environment:
    # select environment to pass to the various http call functions
    def __init__(self):

        list_of_envs = [
            "dev",
            "staging",
            "ops"
        ]

        print("select environment:")
        for x, env in enumerate(list_of_envs):
            print(str(x) + ":" + env)

        selected_env = int(input())

        project_id = f'pison-{list_of_envs[selected_env]}'
        env = list_of_envs[selected_env]
        
        if env == 'ops':
            server_address = 'cloud.pison.io'
        else:
            server_address = f'{list_of_envs[selected_env]}.cloud.pison.io'
        audience = f'pison-{list_of_envs[selected_env]}'
        
        dataset = 'pison_dataset'
        table = 'sensor_data'

        print("")
        print("selected environment:")
        print(f'project_id: {project_id}')
        print(f'env: {env}')
        print(f'server_address: {server_address}')
        print(f'audience: {audience}')
        print(f'dataset: {dataset}')
        print(f'table: {audience}')

        self.project_id = project_id
        self.env = env
        self.server_address = server_address
        self.audience = audience
        self.dataset = dataset
        self.table = table

<h3> Services 

<h4> Microservice Functions

In [None]:
class ResponseConverter(ABC):
    def __call__(self, response):
        return pd.DataFrame()
    
class UsersConverter(ResponseConverter):
    def __call__(self, response):
        try:
            response_dict = MessageToDict(response)
            data_f = pd.json_normalize(response_dict['users'])
        except:
            data_f = super().__call__(response)
        return data_f
    
class ReadinessConverter(ResponseConverter):
    """
    Converter for the `ListReadiness()` RPC call
    """

    def __call__(self, readiness_res):
        """
        Convert a reponse object to pandas dataframe
        :param reponse: a reponse object
        :type response: grpc response
        :return: a dataframe object
        :rtype: pandas.DataFrame
        """
        response_dict = MessageToDict(readiness_res)
        if "scores" in response_dict:
            data_f = pd.json_normalize(response_dict["scores"])
            if "onsetMoments" in data_f:
                data_f = data_f.explode("onsetMoments")
        else:
            data_f = super().__call__(readiness_res)
        return data_f


class BulkyReadinessConverter(ResponseConverter):
    """
    Converter for the `ListReadiness()` RPC call, assuming a
    response that includes bulk user data.
    """

    def __call__(self, readiness_res):
        """
        Convert a reponse object to pandas dataframe
        :param reponse: a reponse object
        :type response: grpc response
        :return: a dataframe object
        :rtype: pandas.DataFrame
        """
        dfs = []
        for _, user_scores in readiness_res.scores_by_user.items():
            for score in user_scores.scores:
                flat_score = pd.json_normalize(MessageToDict(score))
                if "onsetMoments" in flat_score:
                    flat_score = flat_score.explode("onsetMoments")
                dfs.append(flat_score)
        data_f = pd.concat(dfs)
        return data_f


class AgilityConverter(ReadinessConverter):
    """
    Converter for the `ListAgilityScore()` RPC call
    """


class BulkyAgilityConverter(BulkyReadinessConverter):
    """
    Converter for the `ListAgilityScore()` RPC call, assuming a
    response that includes bulk user data.
    """


class FocusConverter(ReadinessConverter):
    """
    Converter for the `ListFocusScore()` RPC call
    """


class BulkyFocusConverter(BulkyReadinessConverter):
    """
    Converter for the `ListFocusScore()` RPC call, assuming a
    response that includes bulk user data.
    """


class BaselineConverter(ResponseConverter):
    """
    Converter for the `ReadBaselineById()` RPC call
    """

    def __call__(self, baseline_res):
        """
        Convert a reponse object to pandas dataframe
        :param reponse: a reponse object
        :type response: grpc response
        :return: a dataframe object
        :rtype: pandas.DataFrame
        """
        response_dict = MessageToDict(baseline_res)
        data_f = pd.DataFrame(response_dict).T
        return data_f


class PlanConverter(ResponseConverter):
    """
    Converter for the `ReadPlan()` RPC call
    """

    def __call__(self, plan_res):
        """
        Convert a reponse object to pandas dataframe
        :param reponse: a reponse object
        :type response: grpc response
        :return: a dataframe object
        :rtype: pandas.DataFrame
        """
        data = MessageToDict(plan_res)
        if data and data["plan"] and data["plan"]["stimuli"]:
            flat_data = {
                "timeInSeconds": [],
                "configuration_color_red": [],
                "configuration_color_green": [],
                "configuration_color_blue": [],
                "configuration_durationInSeconds": [],
            }

            for stimulus in data["plan"]["stimuli"]:
                flat_data["timeInSeconds"].append(stimulus["timeInSeconds"])
                config = stimulus["configuration"]

                for color in ["red", "green", "blue"]:
                    val = config["color"][color] if color in config["color"] else 0.0
                    flat_data[f"configuration_color_{color}"].append(val)

                flat_data["configuration_durationInSeconds"].append(config["durationInSeconds"])

            # Create DataFrame
            data_f = pd.DataFrame(flat_data)
        else:
            data_f = super().__call__(plan_res)
        return data_f
    
class SessionConverter(ResponseConverter):
    def __call__(self, session_res):
        try:
            response_dict = MessageToDict(session_res)
            data_f = pd.json_normalize(response_dict['sessions'])
            if 'protocolExecutions' in data_f:
                data_f = data_f.explode('protocolExecutions')

            # Normalize the dictionaries in the 'col_with_dicts' column
            normalized_column_df = pd.json_normalize(data_f['protocolExecutions'])

            # Rename the columns of the normalized column DataFrame to include the original key name
            renamed_columns = {}
            for col in normalized_column_df.columns:
                original_key = f"{data_f['protocolExecutions'].name}.{col}"
                if original_key not in data_f.columns:
                    renamed_columns[col] = original_key

            normalized_column_df = normalized_column_df.rename(columns=renamed_columns)
            
            # Reset index due to overlapping indices on same session id with a different protocol
            data_f.reset_index(drop=True, inplace=True)

            # Merge the normalized DataFrame with the original DataFrame
            data_f = pd.concat([data_f.drop(columns='protocolExecutions'), normalized_column_df], axis=1)
        except:
            data_f = super().__call__(session_res)
        return data_f 

class ProtocolConverter(ResponseConverter):
    def __call__(self, protocol_res):
        try:
            response_dict = MessageToDict(protocol_res)
            data_f = pd.json_normalize(response_dict['protocols'])
            data_f.rename(columns = {'id': 'protocol_id'}, inplace = True)
        except:
            data_f = super().__call__(protocol_res)
        return data_f 
    
class DeviceConverter(ResponseConverter):
    def __call__(self, device_res):
        try:
            response_dict = MessageToDict(device_res)
            data_f = pd.json_normalize(response_dict['device'])
            data_f.rename(columns = {'id': 'device_uid', 'name': 'device.deviceId'}, inplace = True)
        except:
            data_f = super().__call__(device_res)
        return data_f 
        
class PisonGrpc:
    def __init__(self, env_selection):
        self._target = env_selection.server_address
        self._audience = env_selection.audience
        self._id_token = None
        self._channel = None

    def __enter__(self):
        self.create_channel()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if self._channel is not None:
            self._channel.close()

    def create_channel(self):
        request = google.auth.transport.requests.Request()
        # self._id_token = google.oauth2.id_token.fetch_id_token(request, audience=self._audience)
        
        # Option 2: Using gcloud installed to your machine with account impersonation; useful if you are on a local machine without pantheon installed, but you have gcloud
        #   Requires the following scopes requested from IT (bruno@pison.com)
        #   "Service Account User" (roles/iam.serviceAccountUser)
        #   "Service Account Token Creator" (roles/iam.serviceAccountTokenCreator)
        SERVICE_ACCOUNT = f"dashboard-service-account@pison-{env_selection.env}.iam.gserviceaccount.com"
        audience = self._audience
        result = subprocess.run(
            [f'gcloud auth print-identity-token --impersonate-service-account="{SERVICE_ACCOUNT}" --audiences="{audience}"'],
            stdout=subprocess.PIPE,
            shell=True,
        )
        self._id_token = result.stdout.decode("utf-8")[:-1]  # remove trailing newline
        
        # Create a channel.    
        self._channel = grpc.secure_channel(self._target, grpc.ssl_channel_credentials())

    def __call__(self, service_stub, rpc_name, request, res_converter=None):
        stub = service_stub(self._channel)
        rpc = getattr(stub, rpc_name)

        response = rpc(
            request,
            metadata=[
                ("authorization", f"Bearer {self._id_token}")
            ]
        )

        ret = {
            'response': response
        }

        if res_converter:
            df = res_converter(response)
            ret['dataframe'] = df

        return ret

    @staticmethod
    def to_pb_timestamp(datetime):
        timestamp = Timestamp()
        timestamp.FromDatetime(datetime)
        return timestamp
    
def get_protocol_by_id(env_selection, protocol_id):
    with PisonGrpc(env_selection) as rpc:
        try:
            resp = rpc(protocol_pb2_grpc.ProtocolServiceStub, "GetProtocol", 
                       protocol_pb2.GetProtocolRequest(uuid = protocol_id, active = 3), 
                       ProtocolConverter())
            return resp
        except grpc.RpcError as e:
            if settings.DEBUG:
                raise
            else:
                capture_exception(e)
    
def get_all_protocols(env_selection):
    with PisonGrpc(env_selection) as rpc:
        resp = rpc(protocol_pb2_grpc.ProtocolServiceStub, "GetProtocol", 
                       protocol_pb2.GetProtocolRequest(), 
                       ProtocolConverter())
        return resp

def get_device_by_user_id(env_selection, user_id):
    with PisonGrpc(env_selection) as rpc:
        resp = rpc(device_pb2_grpc.DeviceServiceStub, "ReadDevice", 
                       device_pb2.ReadDeviceRequest(user_id = user_id), 
                       DeviceConverter())
        return resp
       
def read_session(env_selection, session_id):
    with PisonGrpc(env_selection) as rpc:
        try:
            resp = rpc(session_pb2_grpc.SessionServiceStub, "ReadSession", 
                           session_pb2.ReadSessionRequest(uuid=session_id), 
                           SessionConverter())
            return resp
        except grpc.RpcError as e:
            if settings.DEBUG:
                raise
            else:
                capture_exception(e)
                
def read_all_sessions(env_selection):
    with PisonGrpc(env_selection) as rpc:
        resp = rpc(session_pb2_grpc.SessionServiceStub, "ReadSession", 
                       session_pb2.ReadSessionRequest(), 
                       SessionConverter())
        return resp
            
def get_plan_data(env_selection, score_df):
    plans = []

    with PisonGrpc(env_selection) as rpc:
        for uid in tqdm(score_df.uid):
            try:
                plan_res = rpc(ReadinessServiceStub, "ReadPlan", 
                               readiness_pb2.ReadPlanRequest(uid=uid), 
                               PlanConverter())
                this_df = plan_res['dataframe']
                this_df['uid'] = uid

                plans.append(this_df)
            except Exception as ex:
                #logging.error(ex, exc_info=True)
                pass
                
    plan_df = pd.concat(plans)
    return plan_df

def get_pb_date_range(start, end):
    return DateRange(start=datetime_to_timestamp(start), end=datetime_to_timestamp(end))

def datetime_to_timestamp(datetime):
    timestamp = Timestamp()
    timestamp.FromDatetime(datetime)
    return timestamp


def timestamp_to_datetime(timestamp):
    """Converts Protobuf timestamp to date-aware datetime"""
    return timestamp.ToDatetime(tzinfo=timezone.get_current_timezone())

<h2> Get Device Email Pairings

In [None]:
# Query parameters
start_date = '2024-03-01 00:00:00'
end_date = '2024-03-25 23:59:59'
session_id_list = [] # list strings of session ids or []
protocol_list = [] # list of strings of protocols or []
username = None # partial string of username or None
limit = 1000000
dt_start_date = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S")  # Convert string to datetime object
dt_end_date = datetime.strptime(end_date, "%Y-%m-%d %H:%M:%S")  # Convert string to datetime object

env_selection = environment()

# Generate df of the session request from the API
df_session_response = read_all_sessions(env_selection)['dataframe']
df_session_response = df_session_response.rename(columns={'id':'session_id'})

with PisonGrpc(env_selection) as rpc:
        users_res = rpc(AuthorizationManagementServiceStub, "ListUsers", 
                        authorization_pb2.ListUsersRequest(), UsersConverter())

all_user_df = users_res['dataframe'].dropna()

# Change user list accordingly if another list is required
# user_list_df = all_user_df[all_user_df.email.isin(user_of_interest)].reset_index(drop = True)
user_df = all_user_df
user_df = user_df.rename(columns={'uid':'userId'})
try:
    df_session_response = pd.merge(df_session_response, user_df, on='userId', how='left')
except:
    pass

In [None]:
get_device_by_user_id(env_selection, 'dis8wInkIrT8YDPUkBUSo1frbcg1')

In [None]:
readiness_scores = []

# Get Readiness Scores
with PisonGrpc(env_selection) as rpc:
    filter_params = ListQueryParameters(
                user_ids=user_df['userId'],
                date_range=get_pb_date_range(dt_start_date, dt_end_date)
    )

    request = ListReadinessScoreRequest(
        query_parameters=filter_params,
        sort=ReadinessSortParams(key='createdAt', ascending=False),
        pagination=ReadinessPaginationParams(limit=limit, offset=0)
    )

    readiness_res = rpc(ReadinessServiceStub, "ListReadiness", request, BulkyReadinessConverter())
    this_df = readiness_res['dataframe']
    if this_df.shape[0] > 0:
        readiness_scores.append(this_df)

readiness_df = pd.concat(readiness_scores) if len(readiness_scores) > 0 else None
readiness_df.reset_index(inplace=True, drop=True)
df_readiness_full = pd.merge(readiness_df, user_df, on='userId', how='left')

In [None]:
# Get all ready tests that don't have session ids i.e. ones performed with pison ready app
df_ready_tests = df_readiness_full[df_readiness_full.sessionId.isna()]

In [None]:
readiness_device_df = pd.DataFrame()
# get_device_by_user_id(env_selection, 'Ay1z6J3uDaMpcdOkjNO4oxxStux2')['dataframe']
for user_id in df_ready_tests.userId.unique():
    try:
        readiness_device_df = pd.concat([readiness_device_df, get_device_by_user_id(env_selection, user_id)['dataframe']])
    except:
        print(f'{user_id} failed')
        continue
        
readiness_device_df

In [None]:
device_df = pd.merge(readiness_device_df, df_ready_tests, on=['userId'], how='left')
# try except to handle merge of session ids when there are no pantheon sessions
try:
    device_df = pd.concat([device_df, df_session_response[['session_id', 'userId', 'device.deviceId', 'device.deviceVersion', 'email']]])
    device_df = device_df[['email','device.deviceId', 'device.deviceVersion', 'session_id']]
except: 
    device_df = pd.concat([device_df, user_df[['userId', 'email']]])
    device_df = device_df[['email','device.deviceId']]
device_df = device_df.drop_duplicates()
device_df.reset_index(inplace=True, drop=True)
display(device_df)
device_df.to_csv(f'device_user_pairing/device_df_{env_selection.env}.csv')