In [15]:
import pandas as pd
import sqlite3
import logging

# Set up logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(levelname)s: %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')

# Load the data into pandas DataFrames
try:
    calendar = pd.read_csv('https://af-data-engineer-technical-exercise.s3.us-east-2.amazonaws.com/calendar.csv')
    logging.info('Calendar data loaded successfully.')
    
    person = pd.read_csv('https://af-data-engineer-technical-exercise.s3.us-east-2.amazonaws.com/person.csv')
    logging.info('Person data loaded successfully.')
    
    school = pd.read_csv('https://af-data-engineer-technical-exercise.s3.us-east-2.amazonaws.com/school.csv')
    logging.info('School data loaded successfully.')
    
    enrollment = pd.read_csv('https://af-data-engineer-technical-exercise.s3.us-east-2.amazonaws.com/enrollment.csv')
    logging.info('Enrollment data loaded successfully.')
    
    schoolyear = pd.read_csv('https://af-data-engineer-technical-exercise.s3.us-east-2.amazonaws.com/schoolyear.csv')
    logging.info('Schoolyear data loaded successfully.')
except Exception as e:
    logging.error(f"Error loading data: {e}")


2023-07-25 15:34:22 INFO: Calendar data loaded successfully.
2023-07-25 15:34:23 INFO: Person data loaded successfully.
2023-07-25 15:34:23 INFO: School data loaded successfully.
2023-07-25 15:34:23 INFO: Enrollment data loaded successfully.
2023-07-25 15:34:24 INFO: Schoolyear data loaded successfully.


In [8]:
try:
    # Connect to the database
    conn = sqlite3.connect('ic.db')

    # Load the data into the database
    calendar.to_sql('calendar', conn, if_exists='replace', index=False)
    logging.info('Calendar data loaded into SQLite database.')
    
    person.to_sql('person', conn, if_exists='replace', index=False)
    logging.info('Person data loaded into SQLite database.')
    
    school.to_sql('school', conn, if_exists='replace', index=False)
    logging.info('School data loaded into SQLite database.')
    
    enrollment.to_sql('enrollment', conn, if_exists='replace', index=False)
    logging.info('Enrollment data loaded into SQLite database.')
    
    schoolyear.to_sql('schoolyear', conn, if_exists='replace', index=False)
    logging.info('Schoolyear data loaded into SQLite database.')

    # Close the connection
    conn.close()
except Exception as e:
    logging.error(f"Error loading data into SQLite database: {e}")


2023-07-25 09:07:31 INFO: Calendar data loaded into SQLite database.
2023-07-25 09:07:31 INFO: Person data loaded into SQLite database.
2023-07-25 09:07:31 INFO: School data loaded into SQLite database.
2023-07-25 09:07:31 INFO: Enrollment data loaded into SQLite database.
2023-07-25 09:07:31 INFO: Schoolyear data loaded into SQLite database.


In [16]:
#Test the loading process by querying the database and checking the column names
# Connect to the database
conn = sqlite3.connect('ic.db')

# Create a cursor object
cursor = conn.cursor()

# List of table names
tables = ['calendar', 'person', 'school', 'enrollment', 'schoolyear']

# For each table, execute a SQL query to fetch the column names and compare them with the DataFrame columns
for table in tables:
    cursor.execute(f"PRAGMA table_info({table});")
    columns_db = [column[1] for column in cursor.fetchall()]
    columns_df = eval(table).columns.tolist()
    if columns_db == columns_df:
        logging.info(f"Column names for table {table} match between the SQLite database and the original DataFrame.")
        logging.info(f"Column names: {', '.join(columns_db)}")
    else:
        logging.error(f"Columns do not match for table {table}.")
        logging.error(f"Columns in SQLite database: {', '.join(columns_db)}")
        logging.error(f"Columns in original DataFrame: {', '.join(columns_df)}")

# Close the connection
conn.close()

2023-07-25 15:34:28 INFO: Column names for table calendar match between the SQLite database and the original DataFrame.
2023-07-25 15:34:28 INFO: Column names: calendarid, schoolid, calendar_startdate, calendar_enddate, endyear
2023-07-25 15:34:28 INFO: Column names for table person match between the SQLite database and the original DataFrame.
2023-07-25 15:34:28 INFO: Column names: personid, firstname, lastname
2023-07-25 15:34:28 INFO: Column names for table school match between the SQLite database and the original DataFrame.
2023-07-25 15:34:28 INFO: Column names: schoolid, zip, name
2023-07-25 15:34:28 INFO: Column names for table enrollment match between the SQLite database and the original DataFrame.
2023-07-25 15:34:28 INFO: Column names: enrollmentid, personid, calendarid, grade, startdate, enddate
2023-07-25 15:34:28 INFO: Column names for table schoolyear match between the SQLite database and the original DataFrame.
2023-07-25 15:34:28 INFO: Column names: startyear, endyear, 

In [17]:
#Write a function that will execute a SQL query and save changes to the database.
def execute_sql_query(db_name, query):
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    try:
        cursor.execute(query)
        conn.commit()
        logging.info(f"SQL query executed successfully: {query}")
    except Exception as e:
        logging.error(f"Error executing SQL query: {query}")
        logging.error(e)
    finally:
        conn.close()

In [18]:
#Write a function that will execute a SQL query and save changes to the database.

def execute_sql_query(db_name, query):
    try:
        conn = sqlite3.connect(db_name)
        cursor = conn.cursor()
        cursor.execute(query)
        conn.commit()
        conn.close()
        logging.info(f"SQL query executed successfully: {query}")
    except Exception as e:
        logging.error(f"Error executing SQL query: {e}")


In [19]:
#Update the schoolyear table to set the active schoolyear to 2020 using the above function.
def execute_sql_query(db_name, query):
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    try:
        cursor.execute(query)
        conn.commit()
        logging.info(f"SQL query executed successfully: {query}")
    except Exception as e:
        logging.error(f"Error executing SQL query: {query}")
        logging.error(e)
    finally:
        conn.close()

# The SQL query to update the schoolyear table
update_query = "UPDATE schoolyear SET active = 1 WHERE startyear = 2020"

# Use the function to execute the update query
execute_sql_query('ic.db', update_query)



2023-07-25 15:38:45 INFO: SQL query executed successfully: UPDATE schoolyear SET active = 1 WHERE startyear = 2020


In [30]:
#Execute the SQL queries to answer the questions.
#1. How many unique students are enrolled in the active school year?

def fetch_sql_query(db_name, query):
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    try:
        cursor.execute(query)
        rows = cursor.fetchall()
        logging.info(f"SQL query executed successfully: {query}")
        return rows
    except Exception as e:
        logging.error(f"Error executing SQL query: {query}")
        logging.error(e)
        return None
    finally:
        conn.close()
        
query1 = """
SELECT COUNT(DISTINCT e.personid)
FROM enrollment e
JOIN calendar c ON e.calendarid = c.calendarid
JOIN schoolyear sy ON c.endyear = sy.endyear
WHERE sy.active = 1;
"""

result1 = fetch_sql_query(db_name, query1)
logging.info(f"The number of unique students enrolled in the active school year is: {result1[0][0]}")


#Question 2 of SQL
#How many students were enrolled in school bp as of 10/1/2019?

query2 = """
SELECT COUNT(DISTINCT e.personid)
FROM enrollment e
JOIN calendar c ON e.calendarid = c.calendarid
JOIN school s ON c.schoolid = s.schoolid
WHERE s.name LIKE '%bp%' AND e.startdate <= '2019-10-01' AND (e.enddate > '2019-10-01' OR e.enddate IS NULL);"""

result2 = fetch_sql_query(db_name, query2)
logging.info(f"The number of students enrolled in school bp as of 10/1/2019 is: {result2[0][0]}")

#Question 3 of SQL
#Who are the 3 students with the most recent startdate of the students that have multiple enrollments in the active school year?

query3 = """
SELECT e.personid, MAX(e.startdate) as latest_startdate
FROM enrollment e
JOIN calendar c ON e.calendarid = c.calendarid
WHERE c.endyear = 2020
GROUP BY e.personid
HAVING COUNT(e.enrollmentid) > 1
ORDER BY latest_startdate DESC 
LIMIT 3;
"""

result3 = fetch_sql_query(db_name, query3)
logging.info("The 3 students with the most recent startdate of the students that have multiple enrollments in the active school year are:")
for row in result3:
    logging.info(f"Person ID: {row[0]}, Latest Start Date: {row[1]}")


2023-07-25 19:24:54 INFO: SQL query executed successfully: 
SELECT COUNT(DISTINCT e.personid)
FROM enrollment e
JOIN calendar c ON e.calendarid = c.calendarid
JOIN schoolyear sy ON c.endyear = sy.endyear
WHERE sy.active = 1;

2023-07-25 19:24:54 INFO: The number of unique students enrolled in the active school year is: 15760
2023-07-25 19:24:54 INFO: SQL query executed successfully: 
SELECT COUNT(DISTINCT e.personid)
FROM enrollment e
JOIN calendar c ON e.calendarid = c.calendarid
JOIN school s ON c.schoolid = s.schoolid
WHERE s.name LIKE '%bp%' AND e.startdate <= '2019-10-01' AND (e.enddate > '2019-10-01' OR e.enddate IS NULL);
2023-07-25 19:24:54 INFO: The number of students enrolled in school bp as of 10/1/2019 is: 88
2023-07-25 19:24:54 INFO: SQL query executed successfully: 
SELECT e.personid, MAX(e.startdate) as latest_startdate
FROM enrollment e
JOIN calendar c ON e.calendarid = c.calendarid
WHERE c.endyear = 2020
GROUP BY e.personid
HAVING COUNT(e.enrollmentid) > 1
ORDER BY lat

In [37]:
##################
################PYTHON PYTHON PYTHON PYTHON ##########################
######################################################################
######################################################################

#Using the dataframes previously created
###
###
###     Add city to the School df: API    ####

import requests
def get_city(zip_code):
    try:
        # Pad the zip code with a leading zero if necessary
        zip_code = str(zip_code).zfill(5)
        response = requests.get(f'http://api.zippopotam.us/us/{zip_code}')
        data = response.json()
        return data['places'][0]['place name']
    except Exception as e:
        logging.error(f"Error getting city for zip code {zip_code}: {e}")
        return None

# Create a new dataframe from the existing school dataframe
school_df_new = school.copy()

# Add the city column
school_df_new['city'] = school_df_new['zip'].apply(get_city)

# Print the result and log it
print(school_df_new.head())
logging.info(f"Added 'city' column to 'school' dataframe")


###########################
######
###       Add final_enddate to the Enrollment df: Input missing Data ####
##########################
# Create a new dataframe from the existing enrollment dataframe
enrollment_df_new = enrollment.copy()

# Add the final_enddate column
enrollment_df_new['final_enddate'] = enrollment_df_new['enddate'].fillna(enrollment_df_new['calendarid'].map(calendar.set_index('calendarid')['calendar_enddate']))

# Print the result and log it
print(enrollment_df_new.head())
logging.info("Added final_enddate to the enrollment dataframe.")


###########################
######
###       Export active enrollment back to the projects folder####
##########################
# Merge the enrollment df with the calendar df using the 'calendarid' column
merged_df = pd.merge(enrollment_df_new, calendar, left_on='calendarid', right_on='calendarid')

# Merge the resulting df with the school df using the 'schoolid' column
active_enrollment_df = pd.merge(merged_df, school_df_new, left_on='schoolid', right_on='schoolid')

# Select only the columns we need
columns_to_keep = ['schoolid', 'name', 'endyear', 'city', 'final_enddate']
active_enrollment_df = active_enrollment_df[columns_to_keep]

# Rename the 'name' column to 'schoolname'
active_enrollment_df = active_enrollment_df.rename(columns={'name': 'schoolname'})

# Export the resulting df as a csv
active_enrollment_df.to_csv('active_enrollment.csv', index=False)

# Print the result and log it
print(active_enrollment_df.head())
logging.info(f"Exported 'active_enrollment' dataframe to CSV")


2023-07-25 20:47:11 INFO: Added 'city' column to 'school' dataframe
2023-07-25 20:47:11 INFO: Added final_enddate to the enrollment dataframe.
2023-07-25 20:47:12 INFO: Exported 'active_enrollment' dataframe to CSV


   schoolid   zip name       city
0        28  6511   aa  New Haven
1        29  6511   ab  New Haven
2        30  6511   ac  New Haven
3        31  6513   ad  New Haven
4        32  6513   ae  New Haven
   enrollmentid  personid  calendarid grade   startdate enddate final_enddate
0        100268     24646         332    04  2018-08-16     NaN    2019-06-14
1        100093     26658         332    04  2018-08-16     NaN    2019-06-14
2        100228     30559         332    03  2018-08-16     NaN    2019-06-14
3        100238     32384         332    03  2018-08-16     NaN    2019-06-14
4        100184     32389         332    03  2018-08-16     NaN    2019-06-14
   schoolid schoolname  endyear       city final_enddate
0        28         aa     2019  New Haven    2019-06-14
1        28         aa     2019  New Haven    2019-06-14
2        28         aa     2019  New Haven    2019-06-14
3        28         aa     2019  New Haven    2019-06-14
4        28         aa     2019  New Haven 

In [33]:
print(enrollment.columns)

Index(['enrollmentid', 'personid', 'calendarid', 'grade', 'startdate',
       'enddate'],
      dtype='object')


In [34]:
print(calendar.columns)

Index(['calendarid', 'schoolid', 'calendar_startdate', 'calendar_enddate',
       'endyear'],
      dtype='object')
