Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#489 enable UDP queries for MyLA cron job #1392

Merged
merged 6 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ To update snapshots, execute `docker exec -it webpack_watcher npm run-script upd

Data validation scripts are in scripts/data_validation folder. To run the data validation scripts, follow the steps below:

1. Copy env.hjson file from env_sample.hjson file
1. Copy env_validation.hjson file from env_sample.hjson file
```sh
cp scripts/data_validation/env_sample.hjson scripts/data_validation/env.hjson
cp scripts/data_validation/env_sample.hjson scripts/data_validation/env_validation.hjson
```
2. Update the configuration values. Note the hard-coded Michigan specific filters in various query strings. Please adjust those values for your institution usage.

Expand Down
6 changes: 3 additions & 3 deletions config/cron.hjson
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
with
enroll_data as (select id as enroll_id, user_id, type from enrollment_dim where course_id='{course_id}'
and type in ('StudentEnrollment', 'TaEnrollment', 'TeacherEnrollment') and workflow_state= 'active'),
user_info as (select p.unique_name,p.sis_user_id, u.name, u.id as user_id, u.global_canvas_id
user_info as (select p.sis_user_id, u.id as user_id, u.global_canvas_id
from (SELECT ROW_NUMBER() OVER (PARTITION BY user_id order by sis_user_id asc) AS row_number, * FROM pseudonym_dim) as p
join user_dim u on u.id = p.user_id WHERE row_number = 1),
user_enroll as (select u.unique_name, u.sis_user_id, u.name, u.user_id, e.enroll_id,
user_enroll as (select u.sis_user_id, u.user_id, e.enroll_id,
u.global_canvas_id, e.type from enroll_data e join user_info u on e.user_id= u.user_id),
course_fact as (select enrollment_id, current_score, final_score from course_score_fact
where course_id='{course_id}'),
final as (select cast(u.global_canvas_id as BIGINT) as user_id,u.name, u.unique_name as sis_name,
final as (select cast(u.global_canvas_id as BIGINT) as user_id,
'{course_id}' as course_id, c.current_score as current_grade, c.final_score as final_grade,
u.type as enrollment_type
from user_enroll u left join course_fact c on u.enroll_id= c.enrollment_id)
Expand Down
26 changes: 11 additions & 15 deletions config/cron_udp.hjson
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
## UDP (Unizin Data Platform) queries for MyLA cron job
{
"metadata":
## placeholder query before the unizin_metadata table is implemented in UDP
'''
select "learner_activity" as pkey, max(updated_date) as pvalue from entity.learner_activity
select 'canvasdatadate' as pkey, min(dag_run) as pvalue from report.publish_info pi2
''',
"user" :
'''
select
({canvas_data_id_increment} + cast(p2.lms_ext_id as bigint)) as user_id,
case
when p.first_name is not null then p.first_name || ' ' || p.last_name
else p2.sis_ext_id end as name,
case
when pe.email_address is not null then lower(split_part(pe.email_address , '@', 1))
else p2.sis_ext_id end as sis_name,
co.lms_int_id as course_id,
cg.le_current_score as current_grade,
cg.le_final_score as final_grade,
Expand Down Expand Up @@ -152,8 +145,8 @@
cast(ka.lms_int_id as BIGINT) as id,
cast(ka.lms_ext_id as BIGINT) as canvas_id,
a.name as name,
a.le_term_begin_date at time zone 'UTC' as date_start,
a.le_term_end_date at time zone 'UTC' as date_end
a.le_term_begin_date::timestamp without time zone as date_start,
a.le_term_end_date::timestamp without time zone as date_end
from
entity.academic_term as a
left join keymap.academic_term as ka on ka.id = a.academic_term_id
Expand All @@ -171,9 +164,9 @@
cast(co2.lms_int_id as BIGINT) as id,
cast(co2.lms_ext_id as BIGINT) as canvas_id,
cast(at2.lms_int_id as BIGINT) as enrollment_term_id,
co.title as name, -- different than Canvas course name
co.le_start_date at time zone 'UTC' as start_at,
co.le_end_date at time zone 'UTC' as conclude_at
co.le_code as name,
co.le_start_date::timestamp without time zone as start_at,
co.le_end_date::timestamp without time zone as conclude_at
FROM
entity.course_offering co
LEFT OUTER JOIN entity.academic_session as3 on (co.academic_session_id = as3.academic_session_id),
Expand All @@ -183,10 +176,12 @@
WHERE co2.lms_int_id = '{course_id}'
and co.course_offering_id = co2.id
and (
(co.academic_session_id is null and at2.id = no_term_temp.academic_term_id)
(co.academic_session_id is null and co.academic_term_id is not null and (co.academic_term_id = at2.id))
or
(co.academic_session_id is null and co.academic_term_id is null and at2.id = no_term_temp.academic_term_id)
or
(co.academic_session_id = as3.academic_session_id and at2.id = as3.academic_term_id)
)
)
''',
"resource":
'''
Expand Down Expand Up @@ -310,6 +305,7 @@
group by assignment_id
) as f1
on f.assignment_id = f1.assignment_id
where f.id is not null
order by assignment_id, user_id
'''
}
6 changes: 3 additions & 3 deletions dashboard/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def update_unizin_metadata(self):
status += delete_all_records_in_table("unizin_metadata")

# select all student registered for the course
metadata_sql = "select key as pkey, value as pvalue from unizin_metadata"
metadata_sql = queries['metadata']

logger.debug(metadata_sql)

Expand Down Expand Up @@ -234,10 +234,10 @@ def update_canvas_resource(self):
for row in df_attach.itertuples(index=False):
if row.file_state == 'available':
Resource.objects.filter(resource_id=row.id).update(name=row.display_name)
status += f"Row {row.id} updated to {row.display_name}\n"
logger.debug(f"Row {row.id} updated to {row.display_name}")
else:
Resource.objects.filter(resource_id=row.id).delete()
status += f"Row {row.id} removed as it is not available\n"
logger.debug(f"Row {row.id} removed as it is not available")
return status

# update RESOURCE_ACCESS records from BigQuery or LRS data sources
Expand Down
1 change: 1 addition & 0 deletions scripts/data_validation/env_validation_sample.hjson
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,4 @@
]
}
}
}
74 changes: 43 additions & 31 deletions scripts/data_validation/validate_udw_vs_udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,41 @@
import pandas as pd
from sqlalchemy import create_engine

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def format_df(df):
return '\n'+ df.to_string()

def compare_udw_vs_udp_df(udw_query_string, udp_query_string, udw_engine, udp_engine):

def compare_udw_vs_udp_df(udw_query_string, udp_query_string, udw_engine, udp_engine, query_params):
udw_df = pd.read_sql(
udw_query_string, udw_engine)
udw_query_string, udw_engine, params=query_params)

udp_df = pd.read_sql(
udp_query_string, udp_engine)
udp_query_string, udp_engine, params=query_params)

# compare the two dataframes
print(f"shape of dataframe from udw table: {udw_df.shape} column types: {udw_df.dtypes}")
print(udw_df.to_string())
print(f"shape of dataframe from udp table: {udp_df.shape} column types: {udp_df.dtypes}")
print(udp_df.to_string())
# Debug out the data frames
logger.debug("UDW Dataframe:")
logger.debug(format_df(udw_df))
logger.debug("UDP Dataframe:")
logger.debug(format_df(udp_df))

# print diff records
df_diff = pd.concat([udw_df, udp_df]).drop_duplicates(keep=False)
print("different records:")
print(df_diff)
print("end")

pd.testing.assert_frame_equal(udw_df, udp_df, check_dtype=False, check_exact=False, rtol=1e-02, atol=1e-03)

if df_diff.empty:
logger.info("No differences found")
else:
logger.info("Differences found. Different records:")
logger.info(f"shape of dataframe from udw table: {udw_df.shape} column types: {udw_df.dtypes}")
logger.info(f"shape of dataframe from udp table: {udp_df.shape} column types: {udp_df.dtypes}")
logger.info(format_df(df_diff))

# assert that the two dataframes are the same but continue even if they're not
try:
pd.testing.assert_frame_equal(udw_df, udp_df, check_dtype=False, check_exact=False, rtol=1e-02, atol=1e-03)
except AssertionError as e:
logger.error(e)

def get_db_engine(connection_json):
db_name = connection_json['NAME']
Expand Down Expand Up @@ -71,16 +81,17 @@ def main():

# Set up ENV for both UDW and UDP
dir_path = os.path.dirname(os.path.realpath(__file__))
print(dir_path)
print(os.path.abspath(os.path.abspath(os.pardir)))
logger.debug(dir_path)
logger.debug(os.path.abspath(os.path.abspath(os.pardir)))
ENV_UDW = get_env_file('/secrets/env.hjson', 'hjson')
ENV_UDP = get_env_file('/secrets/env_udp.hjson', 'hjson')
ENV_CRON_UDW = get_env_file('/secrets/cron.hjson', 'hjson')
ENV_CRON_UDP = get_env_file('/secrets/cron_udp.hjson', 'hjson')

# Use the config files in this project
ENV_CRON_UDW = get_env_file('/code/config/cron.hjson', 'hjson')
ENV_CRON_UDP = get_env_file('/code/config/cron_udp.hjson', 'hjson')
ENV_VALIDATION = get_env_file(
os.path.join(os.path.dirname(os.path.abspath('__file__')), 'scripts/data_validation/env_validation.hjson'), 'hjson')

# print(hjson.dumps(ENV))
udw_engine = get_db_engine(ENV_UDW['DATA_WAREHOUSE'])
udp_engine = get_db_engine(ENV_UDP['DATA_WAREHOUSE'])

Expand All @@ -90,25 +101,26 @@ def main():

# loop through course ids
for course_id in DATA_WAREHOUSE_COURSE_IDS:
print(f'\n\nfor course id {course_id}:')
logger.info(f'Validating course id {course_id}:')

# from the configuration variable, load the queries based on UDP expanded vs events table
# run queries and compare the returned dataframes
cron_udw_json = ENV_CRON_UDW["CRON_QUERIES"]
cron_udp_json = ENV_CRON_UDP["CRON_QUERIES"]
for query_type in cron_udw_json:
print(f'\ncomparing type {query_type}:')
formatted_udw_query_string = cron_udw_json[query_type].format(
for query_type in ENV_CRON_UDW:
logger.info(f'Comparing query {query_type}:')

formatted_udw_query_string = ENV_CRON_UDW[query_type].format(
course_id=course_id, canvas_data_id_increment=CANVAS_DATA_ID_INCREMENT)
formatted_udp_query_string = cron_udp_json[query_type].format(
formatted_udp_query_string = ENV_CRON_UDP[query_type].format(
course_id=course_id, canvas_data_id_increment=CANVAS_DATA_ID_INCREMENT)
print(formatted_udw_query_string)
print(formatted_udp_query_string)
logger.debug(formatted_udw_query_string)
logger.debug(formatted_udp_query_string)
query_params = {
"course_ids": tuple(DATA_WAREHOUSE_COURSE_IDS)
}

compare_udw_vs_udp_df(
formatted_udw_query_string, formatted_udp_query_string, udw_engine, udp_engine)
formatted_udw_query_string, formatted_udp_query_string, udw_engine, udp_engine, query_params)


if __name__ == "__main__":

main()