In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from awsglue.dynamicframe import DynamicFrame

In [None]:
oecd_dyf = glueContext.create_dynamic_frame.from_catalog(database='intl_students_raw', table_name='oecd')
oecd_res_dyf = oecd_dyf.resolveChoice(specs = [('value','cast:long')])
oecd_df = oecd_res_dyf.toDF()

In [None]:
wb_wdi_dyf = glueContext.create_dynamic_frame.from_catalog(database='intl_students_raw', table_name='worldbank')
wb_wdi_df = wb_wdi_dyf.toDF()

In [None]:
iso_cc_dyf = glueContext.create_dynamic_frame.from_catalog(database='intl_students_raw', table_name='iso3166')
iso_cc_df = iso_cc_dyf.toDF()

In [None]:
# only include records gender of male and female then remove 
oecd_df = oecd_df.filter(oecd_df['SEX'] == '_T')
oecd_df = oecd_df.drop('SEX','Flag Codes')

In [None]:
# inlcude only data from 2013 to 2019
oecd_df = oecd_df.withColumn('YEAR',col('YEAR').cast(IntegerType()))
oecd_df = oecd_df.filter((oecd_df['YEAR'] >= 2013) & (oecd_df['YEAR'] <= 2019))

In [None]:
# inlcude only records where country codes are found in ISO3166
country_2_code_list = list(iso_cc_df.select(iso_cc_df['DIGIT_CODE_TWO']).toPandas()['DIGIT_CODE_TWO'])
country_3_code_list = list(iso_cc_df.select(iso_cc_df['DIGIT_CODE_THREE']).toPandas()['DIGIT_CODE_THREE'])
oecd_df = oecd_df.filter(oecd_df['ORIGIN'].isin(country_2_code_list))
oecd_df = oecd_df.filter(oecd_df['COUNTRY'].isin(country_3_code_list))

In [None]:
# exclude education levels ISCED11_54 and ISCED11_54
oecd_df = oecd_df.filter((oecd_df['EDUCATION_LEV'] != 'ISCED11_54') & (oecd_df['EDUCATION_LEV'] != 'ISCED11_55'))

In [None]:
oecd_df = oecd_df.withColumnRenamed('COUNTRY','COUNTRY_CODE').withColumnRenamed('ORIGIN','ORIGIN_CODE')

In [None]:
# create dataframe that will be table OECD_EDU_ENRL_2013_2019
merge_df = oecd_df.join(iso_cc_df, oecd_df['COUNTRY_CODE'] == iso_cc_df['DIGIT_CODE_THREE'], 'left') \
    .select('COUNTRY','ORIGIN_CODE','EDUCATION_LEV','YEAR','Value').withColumnRenamed('COUNTRY','COUNTRY_OF_SCHOOL')

In [None]:
oecd_enrl_df = merge_df.join(iso_cc_df,  merge_df['ORIGIN_CODE'] == iso_cc_df['DIGIT_CODE_TWO'], 'left') \
    .select('COUNTRY_OF_SCHOOL','COUNTRY','EDUCATION_LEV','YEAR','Value').withColumnRenamed('COUNTRY','COUNTRY_OF_ORIGIN') \
    .withColumnRenamed('EDUCATION_LEV','EDUCATION_LEVEL').withColumnRenamed('Value','NUM_OF_STUDENTS')

In [None]:
oecd_enrl_df = oecd_enrl_df.groupBy('COUNTRY_OF_SCHOOL','COUNTRY_OF_ORIGIN', 'YEAR').sum('NUM_OF_STUDENTS') \
    .withColumnRenamed('sum(NUM_OF_STUDENTS)','TOTAL_INTL_STUDENTS')

In [None]:
# create dataframe that will be table INTL_STUDENT_ORIGIN_2013_2019
intl_student_origin_df = merge_df.join(iso_cc_df,  merge_df['ORIGIN_CODE'] == iso_cc_df['DIGIT_CODE_TWO'], 'left') \
    .select('COUNTRY_OF_SCHOOL','COUNTRY','DIGIT_CODE_THREE','EDUCATION_LEV','YEAR','Value').withColumnRenamed('COUNTRY','COUNTRY_OF_ORIGIN') \
    .withColumnRenamed('DIGIT_CODE_THREE','ORIGIN_COUNTRY_CODE').withColumnRenamed('EDUCATION_LEV','EDUCATION_LEVEL') \
    .withColumnRenamed('Value','NUM_OF_STUDENTS')

In [None]:
intl_student_origin_df = intl_student_origin_df.groupBy('COUNTRY_OF_ORIGIN','ORIGIN_COUNTRY_CODE','YEAR').sum('NUM_OF_STUDENTS') \
    .withColumnRenamed('sum(NUM_OF_STUDENTS)','TOTAL_INTL_STUDENTS')

In [None]:
# create dataframe that will be table INTL_STUDENTS_PER_POPULATION
wdi_pop_df = wb_wdi_df.filter(wb_wdi_df['SERIES_CODE'] == 'SP.POP.TOTL').withColumnRenamed('YEAR','POP_YEAR')

In [None]:
intl_student_per_pop_df = intl_student_origin_df.join(wdi_pop_df, \
                                                      (intl_student_origin_df['ORIGIN_COUNTRY_CODE'] == wdi_pop_df['COUNTRY_CODE']) & (intl_student_origin_df['YEAR'] == wdi_pop_df['POP_YEAR']),'inner')

In [None]:
intl_student_per_pop_df = intl_student_per_pop_df.select('COUNTRY_OF_ORIGIN','ORIGIN_COUNTRY_CODE','YEAR','TOTAL_INTL_STUDENTS','VALUE')
intl_student_per_pop_df = intl_student_per_pop_df.withColumn('Value',col('Value').cast(LongType()))
intl_student_per_pop_df = intl_student_per_pop_df.withColumnRenamed('Value','POPULATION')
intl_student_per_pop_df = intl_student_per_pop_df.filter(intl_student_per_pop_df['POPULATION'] != 0)
intl_student_per_pop_df = intl_student_per_pop_df.withColumn('POPULATION',col('POPULATION').cast(DoubleType())) \
    .withColumn('TOTAL_INTL_STUDENTS',col('TOTAL_INTL_STUDENTS').cast(DoubleType()))
intl_student_per_pop_df = intl_student_per_pop_df.withColumn('PERCENT_INTL_STUDENTS_PER_POPULATION',col('TOTAL_INTL_STUDENTS')/col('POPULATION')*100)
intl_student_per_pop_df = intl_student_per_pop_df.drop('ORIGIN_COUNTRY_CODE')

In [None]:
oecd_enrl_dy_df = DynamicFrame.fromDF(oecd_enrl_df, glueContext, 'oecd_enrl_dy_df')

In [None]:
intl_student_per_pop_dy_df = DynamicFrame.fromDF(intl_student_per_pop_df, glueContext, 'intl_student_per_pop_dy_df')

In [None]:
intl_student_origin_dy_df = DynamicFrame.fromDF(intl_student_origin_df, glueContext, 'intl_student_origin_dy_df')

In [None]:
import redshift_connector

# Connects to Redshift cluster using AWS credentials
conn = redshift_connector.connect(
    host='******************************',
    database='******************',
    user='********************',
    password='***************',
    is_serverless=True,
    serverless_acct_id=***************,
    serverless_work_group='******************'
 )

cursor: redshift_connector.Cursor = conn.cursor()

In [None]:
conn.rollback()
conn.autocommit = True

In [None]:
cursor.execute('''
CREATE TABLE IF NOT EXISTS public.OECD_EDU_ENRL_2013_2019 (
  "COUNTRY_OF_SCHOOL" TEXT,
  "COUNTRY_OF_ORIGIN" TEXT,
  "YEAR" INTEGER,
  "TOTAL_INTL_STUDENTS" BIGINT
)
''')

In [None]:
my_conn_options = {
    "dbtable": "public.OECD_EDU_ENRL_2013_2019",
    "database": "dev"
}

In [None]:
rds_res = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame = oecd_enrl_dy_df,
    catalog_connection = "*******************",
    connection_options = my_conn_options,
    redshift_tmp_dir = "********************"
)

In [None]:
cursor.execute('''
CREATE TABLE IF NOT EXISTS public.INTL_STUDENTS_PER_POPULATION (
  "COUNTRY_OF_ORIGIN" TEXT,
  "YEAR" INTEGER,
  "TOTAL_INTL_STUDENTS" REAL,
  "POPULATION" REAL,
  "PERCENT_INTL_STUDENTS_PER_POPULATION" REAL
)
''')

In [None]:
my_conn_options = {
    "dbtable": "public.INTL_STUDENTS_PER_POPULATION",
    "database": "dev"
}

In [None]:
rds_res = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame = intl_student_per_pop_dy_df,
    catalog_connection = "******************",
    connection_options = my_conn_options,
    redshift_tmp_dir = "*******************"
)

In [None]:
cursor.execute('''
CREATE TABLE IF NOT EXISTS public.INTL_STUDENT_ORIGIN_2013_2019 (
  "COUNTRY_OF_ORIGIN" TEXT,
  "ORIGIN_COUNTRY_CODE" TEXT,
  "YEAR" INTEGER,
  "TOTAL_INTL_STUDENTS" BIGINT
)
''')

In [None]:
my_conn_options = {
    "dbtable": "public.INTL_STUDENT_ORIGIN_2013_2019",
    "database": "dev"
}

In [None]:
rds_res = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame = intl_student_origin_dy_df,
    catalog_connection = "*********************",
    connection_options = my_conn_options,
    redshift_tmp_dir = "*************************"
)