In [1]:
import os
import glob
import pandas as pd
from sqlalchemy import create_engine

In [2]:
def insert_data(engine, destination_table, df_func, func_args=[]):
    """Inserts a dataframe into a table.
    """
    df = df_func(*func_args)
    df.to_sql(destination_table, engine, if_exists='append', index=False, chunksize=1000)

In [3]:
def prepare_instiution_df(filepath):
    """Prepares a dataframe for institutions data.
    """
    df = pd.read_json(filepath).drop(columns=['code'])
    df.columns = ['institution_code','application_reference_number','organisation_name','country','city']
    return df

In [4]:
def prepare_date_df(start_date, end_date):
    """ Prepares a dataframe for the date dimension.
    
    A pandas dataframe is prepared that contains 
    every given month within a given date range
    along with a couple of attributes for each month.
    """
    
    def calculate_academic_year(row):
        """ Calculate academic year based on a month.
        
        For the months January to August, the academic
        year is the starting in the previous year.
        For the months September to October the academic
        year starts in the current year.
        """
        if row['month_number'] <= 8:
            return '{:04d}-{:02d}'.format((row['year_number']-1), row['year_number']%100)
        else:
            return '{:04d}-{:02d}'.format(row['year_number'], (row['year_number']+1)%100)
        
    df = pd.date_range(start_date, end_date, freq='MS')\
                 .strftime("%Y-%m").to_frame()
    df.columns = ['year_month_code']
    df['month_number'] = df.year_month_code.apply(lambda date: int(pd.to_datetime(date).strftime('%m')))
    df['month_name'] = df.year_month_code.apply(lambda date: pd.to_datetime(date).strftime('%B'))
    df['year_number'] = df.year_month_code.apply(lambda date: int(pd.to_datetime(date).strftime('%Y')))
    df['academic_year'] = df.apply(lambda row: calculate_academic_year(row), axis=1)
    return df

In [5]:
def prepare_student_mobility_df(filepath):
    """Prepares a dataframe for student mobility data.
    """
    
    def convert_date(date):
        """ Converts month to yyyy-mm
        """
        try: 
            return pd.to_datetime(date).strftime('%Y-%m')
        except ValueError:
            return None

    def clean_amount(amount):
        """ Converts grant amount to type float if possible"""
        try:
            return float(amount)
        except ValueError:
            return None
        
    def min_date(row):
        """ Find earliest date
        """
        try: 
            min(value for value in [row['study_start_date'], row['work_start_date']] if value is not None)
        except ValueError:
            return None
    
    df = pd.read_csv(filepath, sep=';')
    df2 = pd.DataFrame()
    df2['home_institution_code'] = df.home_institution
    df2['host_institution_code'] = df.host_institution
    df2['mobility_type'] = df.mobility_type
    df2['student_age'] = df.age
    df2['student_gender'] = df.gender
    df2['student_nationality'] = df.nationality
    df2['student_level'] = df.level_study
    df2['student_years_prior'] = df.years_prior
    df2['student_previous_participation'] = df.previous_participation
    df2['work_placement'] = df.work_placement
    df2['work_placement_country'] = df.country_work_placement
    df2['length_study'] = df.length_study
    df2['length_work'] = df.length_work
    df2['short_duration_reason'] = df.short_duration_reason
    df2['study_start_date'] = df.study_start_date.apply(convert_date)
    df2['work_start_date'] = df.work_start_date.apply(convert_date)
    df2['combined_start_date'] =  df2.apply(lambda row: min_date(row), axis=1)
    df2['ects_study'] = df.ects_study
    df2['ects_work'] = df.ects_work
    df2['ects_total'] = df2.ects_study + df2.ects_work
    df2['sn_supplement'] = df.sn_supplement
    df2['study_grant'] = df.study_grant.apply(clean_amount)
    df2['work_grant'] = df.work_grant.apply(clean_amount)
    df2['total_grant'] = df2.sn_supplement + df2.study_grant + df2.work_grant
    return df2

In [6]:
def process_student_mobility_data(engine, filepath, destination_table, df_func):
    """Processes all files in a given filepath.

    Gets all files from a given filepath and prepares a
    pandas dataframe for every file which will then be 
    inserted into the destination table.
    """
    # get all files matching extension from directory
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root, '*.csv'))
        for f in files:
            all_files.append(os.path.abspath(f))

    # get total number of files found
    num_files = len(all_files)
    print('{} files found in {}'.format(num_files, filepath))

    # iterate over files and process
    for i, datafile in enumerate(all_files, 1):
        insert_data(engine, destination_table, df_func, [datafile])
        print('{}/{} files processed ({}).'.format(i, num_files, datafile))

In [7]:
def main():
    """
        Executes ETL process for student mobility data.
    """
    engine = create_engine('postgresql://tag:@localhost:5432/studentdb')
    # load dimensions
    insert_data(engine, 'dim_institution', prepare_instiution_df, ['cleaned_data/institutions/euc.json'])
    insert_data(engine, 'dim_date', prepare_date_df, ['2008-01-01', '2013-12-01'])
    # load facts
    process_student_mobility_data(engine, 'cleaned_data/students_2008_2012', 'fact_student_mobility', prepare_student_mobility_df)
    
main()

50 files found in cleaned_data/students_2008_2012
1/50 files processed (/Users/tag/repos/dend-capstone/cleaned_data/students_2008_2012/part_009.csv).
2/50 files processed (/Users/tag/repos/dend-capstone/cleaned_data/students_2008_2012/part_021.csv).
3/50 files processed (/Users/tag/repos/dend-capstone/cleaned_data/students_2008_2012/part_035.csv).
4/50 files processed (/Users/tag/repos/dend-capstone/cleaned_data/students_2008_2012/part_034.csv).
5/50 files processed (/Users/tag/repos/dend-capstone/cleaned_data/students_2008_2012/part_020.csv).
6/50 files processed (/Users/tag/repos/dend-capstone/cleaned_data/students_2008_2012/part_008.csv).
7/50 files processed (/Users/tag/repos/dend-capstone/cleaned_data/students_2008_2012/part_036.csv).
8/50 files processed (/Users/tag/repos/dend-capstone/cleaned_data/students_2008_2012/part_022.csv).
9/50 files processed (/Users/tag/repos/dend-capstone/cleaned_data/students_2008_2012/part_023.csv).
10/50 files processed (/Users/tag/repos/dend-capst