In [1]:
import boto3 

from boto3.dynamodb.conditions import Key, Attr

import pandas as pd

import os

import time

from IPython.display import clear_output

In [2]:
dynamodb = boto3.resource( 'dynamodb' ) 

# ^^^ Initializes dynamodb for use

In [3]:
adspp_table = dynamodb.Table( 'ads_passenger_processed' ) 

# ^^^ Accesses the ads_passenger_processed table from Drive Ohio

In [4]:
gmID_df = pd.read_csv( '/home/rcoldren_linux/Desktop/Data/groupMetadataID_list_updated.csv' )

# ^^^ Loads a .csv containing all 330 groupMetadataIDs (the unique IDs assigned to each of the 330 individual test drives done by 
#     Drive Ohio) into a Pandas data frame

# The 'groupMetadataID_list_updated.csv' file is simply a renamed version of the 'ads_data_index.csv' file available in the
# Students only -> Files -> Fall 2024 directory of Teams

In [5]:
gmID_list = gmID_df[ 'groupMetadataID' ].tolist()

# ^^^ Puts the 330 groupMetadataIDs in the Pandas data frame above into a list

# This could also be a list of only a select few groupMetadataIDs; it does not have to be all of them

In [6]:
topic_list = [ '/apollo/canbus/chassis',
              '/apollo/sensor/gnss/best_pose' ]

# ^^^ A list of the topics one would like to pull out of the 'ads_passenger_processed' table for each groupMetadataID in 
#     'gmID_list'

In [7]:
desktop_path = '/home/rcoldren_linux/Desktop'

# ^^^ The path to one's desktop in AWS

In [30]:
delta_times = []

# ^^^ Initializes a list for storing the amount of time it took to query for each groupMetadataID in 'gmID_list'

count = 0

# ^^^ Initializes a counting variable to indicate what groupMetadataID was just queried out of the N groupMetadataIDs in 
#     'gmID_list' in the loop below

for gmID in gmID_list:

    # ^^^ Iterates through each groupMetadataID in 'gmID_list'

    time1 = time.time()

    # ^^^ Records the time at the beginning of each groupMetadataID iteration, in sec

    for topic in topic_list:

        # ^^^ Iterates through each topic to be extracted (for the groupMetadataID in the first for loop's iteration) from the
        #     topics in 'topic_list'

        # vvv Creates the parameters for a query to extract the rows containing a particular groupMetadataID and topic (given by 
        #     the current 'gmID' and 'topic' of the two for loops) from the 'ads_passenger_processed' table

        query_input = dict(
                           IndexName = 'groupMetadataID-index',

                           # ^^^ First, chooses the groupMetadataID as the index for the query
                           
                           FilterExpression = Attr( 'topic' ).eq( f'{ topic }' ),

                           # ^^^ Third, tells the query to discard the rows missing the topic from the second for loop's current
                           # iteration
                           
                           KeyConditionExpression = Key( 'groupMetadataID' ).eq( f'{ gmID }' ),

                           # ^^^ Second, tells the query to extract the rows containing the groupMetadataID from the first for 
                           # loop's current iteration
                          )

        query_output = adspp_table.query( **query_input )

        # ^^^ Performs the query on the 'ads_passenger_processed' table using the parameters set in 'query_input'

        query_output_df = pd.DataFrame.from_dict( pd.json_normalize( query_output[ 'Items' ] ), orient = 'columns' )

        # ^^^ Puts the rows extracted by the query into a Pandas data frame

        while ( 'LastEvaluatedKey' in query_output.keys() ):

            # ^^^ If the first query stopped prematurely due to reaching the 1 MB limit, starts a while loop that continues to 
            #     query the 'ads_passenger_processed' table until it is complete

            query_input[ 'ExclusiveStartKey' ] = query_output[ 'LastEvaluatedKey' ]

            # ^^^ Slightly modifies the query parameters to tell the next query to start where the last query stopped 

            query_output = adspp_table.query( **query_input )

            # ^^^ Performs the query that continues where the last one left off

            temp_df = pd.DataFrame.from_dict( pd.json_normalize( query_output[ 'Items' ] ), orient = 'columns' )

            # ^^^ Puts the rows extracted by the continuation query into a temporary Pandas data frame

            query_output_df = pd.concat( [ query_output_df, temp_df ] )

            # ^^^ Concatenates the rows extracted by the continuation query (in the temporary data frame) to the
            #     Pandas data frame that will hold all of the query data in the end

        dir_friendly_topic = topic.replace( '/', '_' )

        # ^^^ Creates a slightly modified version of the topic of the second for loop's current iteration so it is able to be used
        #     as part of the name of a folder (forward slashes are not allowed in folder names due to causing issues with their 
        #     path)

        if not os.path.exists( f'{ desktop_path }/Raw_Data/{ gmID }/{ dir_friendly_topic }' ):
        
            os.makedirs( f'{ desktop_path }/Raw_Data/{ gmID }/{ dir_friendly_topic }' )

        # ^^^ Creates a directory structure (starting at one's desktop in AWS) to store the fully extracted current topic data
        #     for the current groupMetadataID, if it does exist already

        csv_name = gmID + dir_friendly_topic

        # ^^^ Creates a name for the .csv file that will hold the extracted current topic data for the current groupMetadataID

        query_output_df.to_csv( f'{ desktop_path }/Raw_Data/{ gmID }/{ dir_friendly_topic }/{ csv_name }.csv', index = False )

        # ^^^ Saves the fully extracted current topic data for the current groupMetadataID as a .csv file in the aforementioned
        #     directory structure

    time2 = time.time()

    # ^^^ Records the time at the (approximate) end of each groupMetadataID iteration, in sec

    # vvv Indicates that the (count + 1)th groupMetadataID was just queried
    
    count = count + 1

    delta_time = ( time2 - time1 ) / 60

    # ^^^ Calculates how long it took to query for the current iteration's groupMetadataID's requested topics (in min)

    delta_times.append( delta_time )

    # ^^^ Appends the just calculated 'delta_time' to the list 'delta_times', to keep track of how long the current iteration took,
    #     as well as how long all the previous iterations took

    avg_delta_time = sum( delta_times ) / count

    # ^^^ Calculates the average time to complete one iteration (in min) given how long the current and previous iterations took

    est_time_left = avg_delta_time * ( len( gmID_list ) - count )

    # ^^^ Assumming the remaining iterations take exactly as long as the 'avg_delta_time', calculates the amount of time remaining
    #     until all the groupMetadataIDs in 'gmID_list' have been queried for (in min)

    print( f'Progress: { count } gmID out of { len( gmID_list ) }' )
    print( f'Estimated Time Left: { est_time_left } min' )

    # ^^^ Prints how many groupMetadataIDs have been queried for so far out of the total number of groupMetadataIDs in 'gmID_list'
    #     and how much time is estimated to be left until all the groupMetadataIDs in 'gmID_list' have been queried for (in min)

    clear_output( wait = True )

    # ^^^ Replaces the previous printed progress indicator with a new one (if a new is available)

Progress: 330 gmID out of 330
Estimated Time Left: 0.0 min
