Run the config file to authenticate script and query Key Vault

Second script. Run after 'Query_Activites'

In [None]:
%run config

Dependencies 

In [None]:
from pyspark.sql.functions import * 
from pyspark.sql import functions as F
from pyspark.sql import Row
import pandas as pd

### API Query to get more specific details for each activity, need to pass each activity off individually 

Get full activity dataset from what is written in storage, should be all activites

In [None]:

full_activity_dataset = spark.read.format("delta").load(historical_activity_id_path)

Grab all of the disinct activity IDs 

In [None]:
full_activity_ids = full_activity_dataset.select('activity_ids').distinct().rdd.flatMap(lambda x: x).collect()

In [None]:
def query_segments(activity_ids : list):
    """Gets all segment_ids for each activity_id submitted
    Returns distinct values"""
    df = pd.DataFrame()
    activity_id_list =[]
    segment_id_list =[]
    for id in activity_ids:
        activity_id_urls = ("{}{}?include_all_efforts= True").format("https://www.strava.com/api/v3/activities/",id)
        header = {'Authorization': 'Bearer ' + access_token}
        param = {'per_page': 200, 'page': 1}
        my_activity = requests.get(activity_id_urls, headers=header, params=param).json()

        segment_effort_count =  len(my_activity['segment_efforts'])
        count = 0
        while count < segment_effort_count:

            activity_id = my_activity['segment_efforts'][count]['activity']['id']
            segment_id = my_activity['segment_efforts'][count]['id']
            activity_id_list.append(activity_id)
            segment_id_list.append(segment_id)
                  
            columns = ['segment_id', 'activity_id']
            extracted_data = [segment_id_list, activity_id_list]
            segment_df = pd.DataFrame.from_dict(dict(zip(columns, extracted_data)))

            df = pd.concat([df, segment_df])

            count += 1

    #convert pandas df to spark
        
    segment_spark_df = spark.createDataFrame(df)

    #drop duplicate entries
    segment_spark_df = segment_spark_df.dropDuplicates()

    segment_spark_df = segment_spark_df.select(concat(segment_spark_df.segment_id,segment_spark_df.activity_id).alias("Activity_Segment_JointID"), 'segment_id','activity_id')

    segment_spark_df = segment_spark_df.withColumn("ingest_file_name", lit("segment_efforts_ids")) \
                                .withColumn("ingested_at", lit(current_timestamp()))

    return segment_spark_df

In [None]:
#will need to compare the activity ids that have already been queried for their segments
segments_in_storage = spark.read.format("delta").load(segment_effort_path)
activity_ids_with_queried_segments = segments_in_storage.select('activity_id').distinct().rdd.flatMap(lambda x: x).collect()


activity_ids_without_queried_segments = [x for x in full_activity_ids  if x not in activity_ids_with_queried_segments ]

#grab the first 99 spots so as not to overload the api call
eligible_activities = activity_ids_without_queried_segments[:99]



segment_id_df = query_segments(eligible_activities)

In [None]:
#activities that returned segments
#need to add in the lambda to get the row values
returned_activity_ids = segment_id_df.select("activity_id").distinct().rdd.flatMap(lambda x: x).collect()

#activies submitted, that did not return segments
activity_ids_without_segments = [x for x in eligible_activities if x not in returned_activity_ids ]

#Append in activies without segments to the DF
#convert the list of ids_without segments into a DF
#need to adjust column names 
rows = [Row(Activity_Segment_JointID=i,  activity_id = i) for i in activity_ids_without_segments]
new_df = spark.createDataFrame(rows)
new_df = new_df.withColumn("ingest_file_name", lit("segment_efforts_ids")) \
                                .withColumn("ingested_at", lit(current_timestamp()))\
                                .withColumn("segment_id", lit(None).cast("long"))

#reorder columns to union into 
new_df_reordered = new_df.select(*segment_id_df.columns)

#union the two datasets together
all_segment_ids = segment_id_df.union(new_df_reordered)

In [None]:
segment_id_df.show(20)


In [None]:
value_counts = segment_id_df.groupBy("ingested_at").agg(F.count("ingested_at").alias("count")).orderBy("count", ascending=False)
value_counts.show()

In [None]:
all_segment_ids.select("activity_id").distinct().count()

In [None]:
write_dataframe_to_storage(all_segment_ids,segment_effort_path, "mergeSchema", "append" )

In [None]:
all_segment_ids.show(20)

Take unique activity ids, and extract all of the segments associated with those activities

Need to compare activites already stored with segments as there are limits for strava API

Initially will go 99 request, but might need to reduce that to save requests for segments

Query all segments from all activities 

Not all activities register segments, will need to append into final DF that is writtent to storage with placerholder values so as not to keep querying them

In [None]:
segment_id_df.printSchema()

In [None]:
all_segment_ids.show()

In [None]:
def write_dataframe_to_storage(dataset, storage_path, option, mode ):
    """Function to write activity ids to storage. Will overwrite current delta file in storage
    Option refers to schema overwriteSchema or mergeSchema, mode being either overwrite or append"""
    dataset.write.format("delta")\
    .option(option, "true")\
    .mode(mode)\
    .save(storage_path)

In [None]:
segments_in_storage = spark.read.format("delta").load(segment_effort_path)

In [None]:
write_dataframe_to_storage(all_segment_ids,segment_effort_path, "mergeSchema", "append" )

In [None]:
segments_in_storage.show(10)

In [None]:
try:
    #Query path, see if there are any activities with their associated segments written to storage
    segments_in_storage = spark.read.format("delta").load(segment_effort_path)
except:
    #if that errors, meaning first time running the script
    #Write the first 99 activites to storage, will need to specificy sort order 
    top_99_activity_ids = 
    write_dataframe_to_storage(historical_df_to_write,storagepath, "mergeSchema", "append" )


Need to limit to 100 request as the api throws an errors after

In [None]:
write_dataframe_to_storage(segment_id_df,segment_effort_path, "overwriteSchema","overwrite" )

In [None]:
segments_in_storage = spark.read.format("delta").load(segment_effort_path)

In [None]:
#need to get distinct activity_ids and run them through the segment
#we make 1 api query in the first script, so gonna be allowed 99 with this run 

In [None]:
#get all of the activity_ids, limit to 15 as thats how many we can run in a single query
#also need to query the activities we have written to segment storage, so as not to repeat 
#Strava API usage is limited on a per-application basis using both a 15-minute and daily request limit. The default rate limit allows 100 requests every 15 minutes, 
# with up to 1,000 requests per day.

#compare current activites vs what is written, 
#going to need to write some try and excepts for expecting these values in return 