In [36]:
%load_ext autoreload
%autoreload 2

from modules.auth import *
from modules.assessments_endpoints import *
from modules.frame_transformations import *
from modules.config import base_url_illuminate
import logging
import os
import sys
from pyspark import RDD
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("API Request Parallelization") \
    .getOrCreate()

spark.sparkContext.setLogLevel("INFO")


# Configure logging to use StreamHandler for stdout
logging.basicConfig(
    level=logging.INFO,  # Adjust as needed (e.g., DEBUG, WARNING)
    format="%(asctime)s - %(message)s",  # Log format
    datefmt="%d-%b-%y %H:%M:%S",  # Date format
    handlers=[
        logging.StreamHandler(sys.stdout)  # Direct logs to stdout
    ],
    force=True  # Ensures existing handlers are replaced
)


def get_assessment_results(spark, save_path, view_path, years_data, start_date, end_date_override=None):
    logging.info('\n\n-------------New Illuminate Operations Logging Instance')

    try:
        access_token, expires_in = get_access_token()

        assessments_df, assessment_id_list = get_all_assessments_metadata(access_token)
        # assessment_id_list = assessment_id_list[:100] #for testing
        missing_ids_from_metadata = ['114845', '141498'] # Add assessments that are not present in assessements metadata
        assessment_id_list = list(set(assessment_id_list + missing_ids_from_metadata))
        logging.info(f'Here is the length of the assessment_id_list variable {len(assessment_id_list)}')

        test_results_group, log_results_group = parallel_get_assessment_scores(spark, access_token, assessment_id_list, 'Group', start_date, end_date_override=None)
        test_results_standard, log_results_standard = parallel_get_assessment_scores(spark, access_token, assessment_id_list, 'Standard', start_date, end_date_override)
        test_results_no_standard, log_results_no_standard = parallel_get_assessment_scores(spark, access_token, assessment_id_list, 'No_Standard', start_date, end_date_override)
 
        test_results_combined = bring_together_test_results(test_results_no_standard, test_results_standard)
        test_results_view = create_test_results_view(test_results_combined, years_data) #add in grade level col, string matching
        logging.info("Assessment results fetched and processed.")

        
        os.makedirs(save_path, exist_ok=True)

        if years_data == '23-24':
            logging.info(f'Sending data for {years_data} school year')
            send_to_local(save_path, test_results_group, 'assessment_results_group_historical.csv')
            send_to_local(save_path, test_results_combined, 'assessment_results_combined_historical.csv')
            send_to_local(view_path, test_results_view, 'illuminate_assessment_results_historical.csv')
            
        elif years_data == '24-25':
            logging.info(f'Sending data for {years_data} school year')
            send_to_local(save_path, test_results_group, 'assessment_results_group.csv')
            send_to_local(save_path, test_results_combined, 'assessment_results_combined.csv')
            send_to_local(view_path, test_results_view, 'illuminate_assessment_results.csv')
        else:
            raise ValueError(f'Unexpected value for years variable data {years_data}')
        
        #No matter what update assessments_metadata file to display available assessments
        send_to_local(save_path, assessments_df, 'assessments_metadata.csv')
        
        


    except Exception as e:
        logging.error(f"Error fetching assessment results: {e}")
        raise AirflowException("Failed to fetch and process assessment results")


get_assessment_results(spark,
                        save_path = '/home/g2015samtaylor/illuminate',
                        view_path = '/home/g2015samtaylor/views',
                        years_data = '23-24',
                        start_date = '2023-07-01',
                        end_date_override='2024-07-01')

# end_date_override='2024-07-01' #should default to todays date



#Create spark session in main script
#Merge branch with main for feauture enhancement practice. 

#Add to requirements.txt
#Re-initaite docker with new tag of spark, in case need to roll back
#Update changes in docker file
#Make sure changes flow through to airflow
#Run locally for string matching


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


25/01/13 18:31:34 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
25/01/13 18:31:34 INFO SharedState: Warehouse path is 'file:/home/g2015samtaylor/airflow/git_directory/Illuminate/spark-warehouse'.
25/01/13 18:31:35 INFO BlockManagerInfo: Removed broadcast_1_piece0 on icef-instance-2.us-west2-a.c.icef-437920.internal:34901 in memory (size: 3.9 KiB, free: 366.3 MiB)


13-Jan-25 18:31:36 - 

-------------New Illuminate Operations Logging Instance
13-Jan-25 18:31:36 - Calling API token endpoint
13-Jan-25 18:31:36 - Succesfully retrieved API token
13-Jan-25 18:31:36 - Fetching data from https://icefps.illuminateed.com/live/rest_server.php/Api/Assessments/?page=1&limit=1000
13-Jan-25 18:31:36 - Here is the total num of pages on this endpoint 2
13-Jan-25 18:31:36 - Fetching data from https://icefps.illuminateed.com/live/rest_server.php/Api/Assessments/?page=2&limit=1000
13-Jan-25 18:31:36 - Here is the total num of pages on this endpoint 2
13-Jan-25 18:31:36 - Looped through 2 pages. Results for func get_all_assessments_metadata output into DataFrame
13-Jan-25 18:31:36 - Here is the length of the assessment_id_list variable 1128


25/01/13 18:31:37 INFO SparkContext: Starting job: collect at /home/g2015samtaylor/airflow/git_directory/Illuminate/modules/assessments_endpoints.py:226
25/01/13 18:31:37 INFO DAGScheduler: Got job 2 (collect at /home/g2015samtaylor/airflow/git_directory/Illuminate/modules/assessments_endpoints.py:226) with 4 output partitions
25/01/13 18:31:37 INFO DAGScheduler: Final stage: ResultStage 2 (collect at /home/g2015samtaylor/airflow/git_directory/Illuminate/modules/assessments_endpoints.py:226)
25/01/13 18:31:37 INFO DAGScheduler: Parents of final stage: List()
25/01/13 18:31:37 INFO DAGScheduler: Missing parents: List()
25/01/13 18:31:37 INFO DAGScheduler: Submitting ResultStage 2 (PythonRDD[5] at collect at /home/g2015samtaylor/airflow/git_directory/Illuminate/modules/assessments_endpoints.py:226), which has no missing parents
25/01/13 18:31:37 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 6.2 KiB, free 366.3 MiB)
25/01/13 18:31:37 INFO MemoryStore: Bloc

13-Jan-25 18:36:09 - Assessment results fetched and processed.
13-Jan-25 18:36:09 - Sending data for 23-24 school year
13-Jan-25 18:36:10 - assessment_results_group_historical.csv saved to /home/g2015samtaylor/illuminate
13-Jan-25 18:36:12 - assessment_results_combined_historical.csv saved to /home/g2015samtaylor/illuminate
13-Jan-25 18:36:14 - illuminate_assessment_results_historical.csv saved to /home/g2015samtaylor/views
13-Jan-25 18:36:14 - assessments_metadata.csv saved to /home/g2015samtaylor/illuminate


In [40]:
%load_ext autoreload
%autoreload 2

from modules.auth import *
from modules.assessments_endpoints import *
from modules.frame_transformations import *
from modules.config import base_url_illuminate
import logging
import os
import sys

import pandas as pd
from modules.frame_transformations import *
pd.set_option('display.max_colwidth', None)

path_fixes_file = '/home/g2015samtaylor/airflow/git_directory/Illuminate/modules/illuminate_historical_column_fixes_2324.csv'
historical_view_path = '/home/g2015samtaylor/views/illuminate_assessment_results_historical.csv'
access_token, expires_in = get_access_token()

fixes = pd.read_csv(path_fixes_file)
v = pd.read_csv(historical_view_path) 

h = fix_historical_columns(access_token, path_fixes_file, historical_view_path)


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
13-Jan-25 18:40:48 - Calling API token endpoint
13-Jan-25 18:40:48 - Succesfully retrieved API token
The length of the v frame is 103458 rows
the length after the merge is 103458 rows


25/01/13 19:01:03 INFO BlockManagerInfo: Removed broadcast_0_piece0 on icef-instance-2.us-west2-a.c.icef-437920.internal:34901 in memory (size: 3.9 KiB, free: 366.3 MiB)
25/01/13 19:01:03 INFO BlockManagerInfo: Removed broadcast_4_piece0 on icef-instance-2.us-west2-a.c.icef-437920.internal:34901 in memory (size: 3.9 KiB, free: 366.3 MiB)
25/01/13 19:01:03 INFO BlockManagerInfo: Removed broadcast_3_piece0 on icef-instance-2.us-west2-a.c.icef-437920.internal:34901 in memory (size: 3.9 KiB, free: 366.3 MiB)
25/01/13 19:01:03 INFO BlockManagerInfo: Removed broadcast_2_piece0 on icef-instance-2.us-west2-a.c.icef-437920.internal:34901 in memory (size: 3.9 KiB, free: 366.3 MiB)


In [33]:
historical_view_path = '/home/g2015samtaylor/views/illuminate_assessment_results_historical.csv'

v = pd.read_csv(historical_view_path) 
#The length of v prior to new raw file is 