# Download Data from NA

A new approach to download the data from the NanoAvionics server that hopefully doesn't use their stupid classes

In [1]:
# import necessary libraries
import requests
from requests.auth import HTTPBasicAuth
from requests.auth import HTTPDigestAuth
from requests_oauthlib import OAuth1
from tqdm.auto import tqdm
import os
import json
from IPython.display import clear_output as clear

import numpy as np
import raadpy as rp

In [2]:
# Send a data download request using REST
class RestOperations:
    """Send a data download request using the REST Protocol
    """
    # Initialize with the link
    def __init__(self, apiEndPoint, **kwargs):
        """Constructor

        Args:
            apiEndPoint (string): the url needed to make the request
        """
        self.apiEndPoint = apiEndPoint
        self.kwargs = kwargs
    
    def SendGetReq(self):
        """Send a download request to the URL

        Returns:
            json: A json file with all the downloaded data
        """
        # Get the needed authorization information
        auth = self.CallAuth(self.kwargs)

        # Make the request
        RespGetReq = requests.get(self.apiEndPoint, auth = auth, stream=True)

        # Check for errors
        if RespGetReq.status_code != 200:
            RespGetReq.raise_for_status()
            raise RuntimeError(f"Request to {self.apiEndPoint} returned status code {RespGetReq.status_code}")

        # Convert the output to a json and return
        return json.loads(RespGetReq.text)

    def CallAuth(self, OptionalAttrs):
        """Handle authorization stuff

        Args:
            OptionalAttrs (_type_): The necessary arguments needed for the type of authorization

        Returns:
            auth: An authorization object
        """
        authType = self.ValidateAuthAttrs(OptionalAttrs)
        if not authType:
            auth = None            
        elif authType == 'token':
            auth = HTTPBearerAuth(OptionalAttrs.get('token'))
        elif authType == 'basic':
            auth = HTTPBasicAuth(OptionalAttrs.get('username'), OptionalAttrs.get('password'))
        elif authType  == 'digest':
            auth = HTTPDigestAuth(OptionalAttrs.get('username'), OptionalAttrs.get('password'))
        elif authType  == 'oa1':
            auth = OAuth1(OptionalAttrs.get('AppKey'), OptionalAttrs.get('AppSecret'), OptionalAttrs.get('UserToken'), OptionalAttrs.get('UserSecret'))
        return auth
    
    def ValidateAuthAttrs(self, OptionalAttrs):
        """Make sure the optinal attributes of this class exist
        """
        if 'authType' not in OptionalAttrs:
            authType = None
        else:
            if OptionalAttrs.get('authType') not in ['token', 'digest', 'basic', 'oa1']:
                raise ValueError("Unknown authType received", OptionalAttrs.get('authType'))
            else:
                if OptionalAttrs.get('authType') == 'token' and 'token' not in OptionalAttrs:
                    raise ValueError("authType 'token' requires token")
                elif OptionalAttrs.get('authType') == 'basic' and not all(attr in OptionalAttrs for attr in ['username', 'password']):
                    raise ValueError("authType 'basic' requires username, password")
                elif OptionalAttrs.get('authType') == 'digest' and not all(attr in OptionalAttrs for attr in ['username', 'password']):
                    raise ValueError("authType 'digest' requires username, password")
                elif OptionalAttrs.get('authType') == 'oa1' and not all(attr in OptionalAttrs for attr in ['AppKey', 'AppSecret', 'UserToken' 'UserSecret']):
                    raise ValueError("authType 'oa1' requires AppKey, AppSecret, UserToken, UserSecret")
                else:
                    authType = OptionalAttrs.get('authType')
        return authType

class HTTPBearerAuth(requests.auth.AuthBase):
    '''requests() does not support HTTP Bearer tokens authentication, create one'''
    def __init__(self, token):
        self.token = token
    def __eq__(self, other):
        return self.token == getattr(other, 'token', None)
    def __ne__(self, other):
        return not self == other
    def __call__(self, r):
        r.headers['Authorization'] = 'Bearer ' + self.token
        return r

In [3]:
# Download tests

# Generate some variables
fileName="pc_se0_log"
fileName="pc_buff2"
host="https://light1.mcs.nanoavionics.com"
token="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoia2hhbGlmYSIsImV4cCI6MTcwNDA2NzIwMCwiZW1haWwiOiJhZGcxMUBueXUuZWR1In0.LiV8bfKb2JUG2eIIxouXKebQpPFLXewO1BqoOD22xS4"

# Download a range of data based on some limit
def download_range(url:str,token,limit:int=5000,VERBOSE:bool=False):
    """Downloads a range of data given a url and a token from the NA servers. 
    Automatically handles large file sizes.

    Args:
        url (str): the url from the NA server with the data to download from 
        token (str): The string value of the token for security authentication
        limit (int, optional): Number of rows to download at one go. Large numbers make the server crash. Defaults to 5000.
        VERBOSE (bool, optional): If true update statistics are printed while the fies is being downloaded. Defaults to False.

    Returns:
        data (list): a list of the binary strings of the downloaded data
    """

    # store the result
    data        = []
    last_data   = []
    seq         = -1
    cnt         = 0

    # Keep downloading until there is nothing left
    while True:
        # Print how much data you have downloaded
        clear(wait=True)
        if VERBOSE: 
            print('Current File: ',url,'\nEntries Downloaded:',len(data),'\nLast Sequence Number:',seq,'\nIterations:',cnt)
            # find the number of bytes per entry
            print('Bytes per entry: ',np.unique([len(d) for d in data]))
            cnt+=1

        # Do the REST stuff
        rest = RestOperations(url+f'&limit={limit}&seq_nr=gte.{seq}', authType = 'token', token = token)
       
        # Download the data
        last_data   = rest.SendGetReq()
        data        += last_data

        # If there are no more data exit
        if len(last_data) < limit or seq == max([datum['seq_nr'] for datum in data]):
            return data
        
        # Find the last sequence number
        seq = max([datum['seq_nr'] for datum in data])


# Create a rest request
# rest = RestOperations(f'{host}/{fileName}_download?archived_ts', authType = 'token', token = token)

# Download the data using the request
# data = rest.SendGetReq()

# data = download_range(f'{host}/{fileName}_download?seq_nr=lte.1000000',token,VERBOSE=True)

# print(len(data),data)

In [4]:
# Working functions

# Order the data according to entry number
def sort(data,field='entry_nr'):
    """Sort the data based on a metadata field

    Args:
        data (array of dictionaries): The array of dictionaries from the downloaded data
        field (str, optional): The metadata field to sort according to. Defaults to 'entry_nr'.

    Returns:
        sorted: Sorted list of lists
    """
    if len(data) <= 1: return data
    
    # Get the indices
    idx = np.argsort([d[field] for d in data])
    
    # Sorted array
    sorted = [data[idx[i]] for i in range(len(data))]

    return sorted

# Download data based on various keys
def download_file_ver(buffer:int = 1, file_ver=1):
    """Download a data from NA server with a common file version

    Args:
        buffer (int, optional): The buffer to download. Defaults to 1.
        file_ver (int, optional): The file version number. Defaults to 1.

    Returns:
        data: list of dictionaries with the rows
    """
    # Generate some variables
    fileName="pc_buff"+str(buffer)
    host="https://light1.mcs.nanoavionics.com"
    token="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoia2hhbGlmYSIsImV4cCI6MTcwNDA2NzIwMCwiZW1haWwiOiJhZGcxMUBueXUuZWR1In0.LiV8bfKb2JUG2eIIxouXKebQpPFLXewO1BqoOD22xS4"
    url = f'{host}/{fileName}_download?file_ver=eq.{file_ver}'

    # Download the data using segmented download
    data = download_range(url,token,VERBOSE=True)

    # Sort the data
    data = sort(data)

    return data

# Download data based on various keys
def download_log(start:str=None,end:str=None):
    """Download a log file from the NA version

    Args:
        file_ver (int, optional): The file version number. Defaults to 1.

    Returns:
        data: list of dictionaries with the rows
    """
    # Generate some variables
    fileName="pc_se0_log"
    host="https://light1.mcs.nanoavionics.com"
    token="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoia2hhbGlmYSIsImV4cCI6MTcwNDA2NzIwMCwiZW1haWwiOiJhZGcxMUBueXUuZWR1In0.LiV8bfKb2JUG2eIIxouXKebQpPFLXewO1BqoOD22xS4"
    url = f'{host}/{fileName}_download?'
    if start is not None: 
        url += f'archived_ts=gte.{start}'
        if end is not None: url += f'&archived_ts=lt.{end}'
    elif end is not None: url += f'archived_ts=lt.{end}'

    # Download the data using segmented download
    data = download_range(url,token,VERBOSE=True)

    # Sort the data
    data = sort(data)

    return data

# Download data based on time range
def download_time_delta(buffer:int = 1, start:str=None, end:str=None):
    """Download NA data on a time interval 

    Args:
        buffer (int, optional): The buffer number. Defaults to 1.
        start (str, optional): String with iso date to start. Defaults to '2022-06-01T00:00:00'.
        end (str, optional): String with iso date to end. Defaults to '2022-06-07T00:00:00'.

    Returns:
        data: list of dictionaries with the rows
    """
    # Generate some variables
    fileName="pc_buff"+str(buffer)
    host="https://light1.mcs.nanoavionics.com"
    token="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoia2hhbGlmYSIsImV4cCI6MTcwNDA2NzIwMCwiZW1haWwiOiJhZGcxMUBueXUuZWR1In0.LiV8bfKb2JUG2eIIxouXKebQpPFLXewO1BqoOD22xS4"
    url = f'{host}/{fileName}_download?'
    if start is not None: 
        url += f'archived_ts=gte.{start}'
        if end is not None: url += f'&archived_ts=lt.{end}'
    elif end is not None: url += f'archived_ts=lt.{end}'

    # Download the data using segmented download
    data = download_range(url,token,VERBOSE=True)

    # Sort the data
    data = sort(data)

    return data

# Save this data to a file to avoid having them in memory
def save_raw_data(data,filepath:str='./',buffer:int=1):
    """Save the raw data to a file in the computer

    Args:
        data (_type_): The raw data downloaded from NA server
        filepath (str, optional): The path that you want to save the file to. Defaults to './'.
        buffer (int, optional): The buffer number. Defaults to 1.

    Returns:
        string: The filename of the file.
    """
    # Create the filename
    timestamp   = '2022-NA-NAT' if len(data) == 0 else data[0]['archived_ts']
    date        = timestamp[0:timestamp.index('T')]
    filename    = filepath + f'light1-{date}-buff{buffer}.dat'

    # Load the file to write the output
    file = open(filename,'wb')

    # Append the data
    for row in data:
        # Convert the hexadecimal entry to bytes
        entry = bytes.fromhex(row['entry_data'][2:])
        file.write(entry)
    
    # Close the file
    file.close()

    # Return the filename if you need it
    return filename

# Convert from binary
def log_to_ascii(data,fileName:str=None):
    """Decode binary log file to ascii

    Args:
        data (dictionary): The dictionary obtained from the downloaded NA code
        fileName (str, optional): Filename to export the logfile to. If None then the file is not exported. Defaults to None.

    Returns:
        str: The decoded logfile as a string
    """
    # Store the full decoded text here
    full_text = ''

    # For every line in the logfile
    for entry in data:
        line =  bytes.fromhex(entry['entry_data'][2:]).decode("ASCII")
        full_text += line

    # If you need to store do so
    if fileName is not None: 
        file = open(fileName,'w')
        file.write(full_text)
        file.close()

    # Return the full text
    return full_text

# Parse a logfile and obtain metadata
def log_expand(filename:str=None,text:str=None):
    """Gets a logfile and decodes it to a list of commands. 
    If a text value is given then it decodes the text, if not, it then decodes the value from the filename

    Args:
        text (str, optional): The text of the logfile. Defaults to None.
        filename (str, optional): The filename of the file where the logfile is. Defaults to None.

    Raises:
        BaseException: If both parameters are left as None, then nothing happens. 

    Returns:
        decoded_logfile (list): List of lists. Each entry is a tuple with a command and a list for the outputs. 
    """

    # Do some argument processing:
    if filename is not None:
        # Load the logfile
        logfile = open(filename)

        # Load the lines
        loglines = logfile.readlines()

        # Close the file
        logfile.close()

    elif text is not None:
        loglines = text.split('\n')

    else: raise BaseException("Please enter input")

    # Add an SE0> line at the end if it doesn't exist
    if "SE0>" not in loglines[-1]: loglines.append("SE0>")

    # Decode the file
    # Find the indices of hte command lines
    commands_idx = [i for i,line in enumerate(loglines) if 'SE0>' in line]
    
    # Collect the outputs of the commands
    decoded_log = [[loglines[commands_idx[i]],loglines[commands_idx[i]+1:commands_idx[i+1]]] for i in range(len(commands_idx)-1)]

    # Return
    return decoded_log
    

# Parse custom command from satellite
def parse_custom_scenario(cmd:str):
    """Parses a custom scenario command message string to a dictionary of decoded hex values

    Args:
        cmd (str): Teh command message

    Returns:
        dict: The dictionary with outputs of all the relevant parameters set for the particular payload
    """
    # Store the data in a dictionary
    data = {}

    # Decode the information from the string
    data['hv']          = int(cmd[0:4],base=16)
    data['veto_hv']     = int(cmd[4:8],base=16)
    data['ch0_thresh']  = int(cmd[10:12]+cmd[8:10],base=16)
    data['ch1_thresh']  = int(cmd[14:16]+cmd[12:14],base=16)
    data['ch2_thresh']  = int(cmd[18:20]+cmd[16:18],base=16)
    data['ch3_thresh']  = int(cmd[22:24]+cmd[20:22],base=16)

    return data


# Obtain the metadata from a parsed logfile
def log_metadata(decoded_log:list):

    # metadata array initialization
    metadata = {
        'start_time':       None,
        'end_time':         None,
        'hv_SiPM':          -1,
        'hv_PMT':           -1,
        'hv_veto_SiPM':     -1,
        'hv_veto_PMT':      -1,
        'thresholds_SiPM':{
            'channel_0':    0,
            'channel_1':    0,
            'channel_2':    0,
            'channel_3':    0,
        },
        'thresholds_PMT':{
            'channel_0':    0,
            'channel_1':    0,
            'channel_2':    0,
            'channel_3':    0,
        },
        'custom_scenario_PMT': -1,
        'custom_scenario_SiPM': -1
    }

    # Get the command list
    commands = [row[0] for row in decoded_log]

    # Find the start and end of the data acquisition
    # Index of start and end timestamps:
    start = [i for i in range(len(commands)) if "rtc read" in commands[i]]
    if len(start) != 0: metadata['start_time']  = decoded_log[start[0] ][1][0][-21:-2]
    if len(start) >= 2: metadata['end_time']    = decoded_log[start[-1]][1][0][-21:-2]

    # Find the custom scenario commands for SiPM and PMT
    for num,payload in zip([12,13],['SiPM','PMT']):
        # Get all the commands with the custom scenario
        custom_commands = np.unique([commands[i] for i in range(len(commands)) if f"csp txrx {num} 9 3000" in commands[i]])
        
        # If there are any, decode them and replace
        if len(custom_commands) != 0: 
            message = custom_commands[0].split(' ')[-1][:-1]
            data    = parse_custom_scenario(message)

            # Update the decoded data to the metadata
            metadata['hv_'+payload]                         = data['hv']
            metadata['hv_veto_'+payload]                    = data['veto_hv']
            metadata['thresholds_'+payload]['channel_0']    = data['ch0_thresh']
            metadata['thresholds_'+payload]['channel_1']    = data['ch1_thresh']
            metadata['thresholds_'+payload]['channel_2']    = data['ch2_thresh']
            metadata['thresholds_'+payload]['channel_3']    = data['ch3_thresh']
            metadata['custom_scenario_'+payload]            = message
        

    # Return the metadata
    return metadata
            

# Download script packet
def download_data_packet(start:str=None,end:str=None,filepath:str='./'):
    """Download a packet of data from light-1 NA Server. This is the main library used.

    Args:
        start (str, optional): The start timestamp iso. Defaults to None.
        end (str, optional): The end timestmap in iso. Defaults to None.
        filepath (str, optional): The filepath to save everyhing. Defaults to './'.

    Returns:
        str: list of filenames to return
    """
    
    # Create a directory to store all this data
    if start is not None: filepath += 'light1-'+start[:start.index('T')]+'/'
    else: filepath += 'light1-data/'
    os.mkdir(filepath)

    # List that holds all the filenames
    filenames = []

    # First go ahead and download all the buffers
    for i in tqdm(range(1,10),desc='Downloading Buffer'):
        # Download the data of the buffer
        data    = download_time_delta(buffer=i,start=start,end=end)

        # Save the data of the buffer
        fname   = save_raw_data(data,filepath=filepath,buffer=i)
        filenames.append(fname)

    # Download the script log
    log         = download_log(start=start,end=end)
    if start is not None: log = log_to_ascii(log,fileName=filepath+'light1-'+start[:start.index('T')]+'-se-log.txt')
    else: log = log_to_ascii(log,fileName=filepath+'light1-se-log.txt')
    decoded_log = log_expand(text=log)

    # Extract the metadata from the logfile
    metadata = log_metadata(decoded_log=decoded_log)

    # Save the datafile as a json on the same directory
    with open(filepath + "metadata.json","w") as meta_file: json.dump(metadata,meta_file,indent=4)

    return metadata


In [5]:
filenames = download_data_packet(start='2022-08-28T00:00:00', end='2022-08-29T23:00:00')

Entries Downloaded: 0 
Last Sequence Number: -1 
Iterations: 0
Bytes per entry:  []


In [5]:
filenames = download_data_packet()

Entries Downloaded: 5000 
Last Sequence Number: 40791 
Iterations: 1
Bytes per entry:  [5]


AttributeError: 'NoneType' object has no attribute 'index'