# DS 3002 Data Project 1

##### Joseph Lee (sl5nj)

### ETL data processor 

Due on March 21th at 11:59 PM

1.	Deliverable: Author a segment of an ETL pipeline that will ingest or process raw data. You must also submit a URL to a GitHub repository for your solution. In python you’ll need to know how to open files, iterate files, pattern match and output files.
2.	Benchmarks: 
i.	Your data processor should be able to ingest a pre-defined data source and perform at least three of these operations: 
1.	Fetch / download / retrieve a remote data file by URL, or ingest a local file mounted. Suggestions for remote data sources are listed at the end of this document. 
2.	Convert the general format and data structure of the data source (from JSON to CSV, from CSV to JSON, from JSON into a SQL database table, etc. I want the option to convert any source to any target. So, if I get a CSV as an input, I want the user to choose an output)
a)	EXTRA – Use an API (like twitter) to pull information realtime.
3.	Modify the number of columns from the source to the destination, reducing or adding columns. 
4.	The converted (new) file should be written to disk (local file) or written to a SQL database. 
5.	Generate a brief summary of the data file ingestion including: 
a)	Number of records 
b)	Number of columns 
ii.	The processor should produce informative errors should it be unable to complete an operation. (Try / Catch with error messages)
3.	Grading: 
i.	o Successful build of the solution (I recommend Python…but you can use whatever you like. I just need to be able to run it)
ii.	o    Functionality that meets all benchmarks – 10 points 
iii.	o Creativity / Innovation / Quality – 2 points 
iv.	o Documentation – Describes how to use the data processor and the elements that make it operational – 3 points 

#### Import packages Needed

In [1]:
import os
import numpy
import csv, json
import pandas as pd
import sqlite3
from sqlite3 import Error

#### Load csv 

In [2]:
# Data collected from Kaggle
# # https://www.kaggle.com/padhmam/qs-world-university-rankings-2017-2022
univ = pd.read_csv("university_ranking.csv")

In [3]:
univ.head()

Unnamed: 0,university,year,rank_display,score,country,city,region,type,research_output,student_faculty_ratio,international_students,size,faculty_count
0,Massachusetts Institute of Technology (MIT),2017,1,100.0,United States,Cambridge,North America,Private,Very High,4.0,3730,M,3065
1,Stanford University,2017,2,98.7,United States,Stanford,North America,Private,Very High,3.0,3879,L,4725
2,Harvard University,2017,3,98.3,United States,Cambridge,North America,Private,Very High,5.0,5877,L,4646
3,University of Cambridge,2017,4,97.2,United Kingdom,Cambridge,Europe,Public,Very high,4.0,7925,L,5800
4,California Institute of Technology (Caltech),2017,5,96.9,United States,Pasadena,North America,Private,Very High,2.0,692,S,968


#### Function to Create Database

In [4]:
def create_connection(db):
    """ 
    Purpose: Create a Database with given File Path and Name
             The function also counts for an error when it creates a database
             
    Input: 
        db   String File Path with Name of Db with .db extension
    
    Output: None, it creates .db file into the given path
    """
    conn = None
    try:
        conn = sqlite3.connect(db)
        print("Database Successfully Created")
    except Error as e:
        print(e)
    finally:
        if conn:
            conn.close()

#### Create Database

In [5]:
create_connection(r"univ.db")

Database Successfully Created


#### Create tables into Database

In [6]:
"""
This cell block creates a table into the database created
The query is first stored into a string called "create_tables_into_univ_db"
Then it tries to execute and commit into the univ.db
Upon successfull attempt, it prints message
It also accounts for errors 
"""

create_tables_into_univ_db = '''CREATE TABLE if not exists university (
                                id int PRIMARY KEY,
                                university longtext,
                                year int not null,
                                rank_display int not null,
                                score decimal(19,4),
                                country varchar(60) not null,
                                city varchar(60), 
                                region varchar(60),
                                type varchar(60),
                                research_output varchar(30),
                                student_faculty_ratio int,
                                international_students int,
                                size varchar(5),
                                faculty_count int); '''

conn = sqlite3.connect("univ.db")
cur = conn.cursor()
cur.execute(create_tables_into_univ_db)
try:
    conn.commit()
    print("Table Successfully Created")

except sqlite3.Error as error:
    print("Error while creating table", error)


Table Successfully Created


#### Function to convert CSV to JSON

In [7]:
def csv_to_json(csvFilePath, jsonFilePath):
    """
    Purpose: Converting CSV file into JSON file
    
    Inputs: 
        csvFilePath   String
        jsonFilePath  String
        
    Output: Creates converted JSON file to the given json file path 
    """
    jsonArray = []
      
    #read csv file
    with open(csvFilePath, encoding='utf-8') as csvfile: 
        #load csv file using the dictionary reader from the csv library
        csvfile = csv.DictReader(csvfile) 

        #convert each csv rows into dictionary
        for rows in csvfile: 
            #add the dictionary to json array
            jsonArray.append(rows)
  
    #convert python jsonArray to JSON String and write to file
    with open(jsonFilePath, 'w', encoding='utf-8') as jsonfile: 
        jsonstring = json.dumps(jsonArray, indent=4)
        jsonfile.write(jsonstring)

In [9]:
csvFilePath = r'university_ranking.csv'
jsonFilePath = r'university_ranking.json'
 
# Call the make_json function
csv_to_json(csvFilePath, jsonFilePath)

#### Function to convert JSON to CSV

In [20]:
def json_to_csv(jsonFilePath, csvFileName):
    """
    Purpose: This function is to convert JSON file to CSV
    
    Inputs: 
        jsonFilePath        String
        csvFileName         String
        
    Output:
        None, this will create a csv file to the working directory
    """
    with open(jsonFilePath, encoding='utf-8') as inputfile:
        df = pd.read_json(inputfile)

    df.to_csv(csvFileName, encoding='utf-8', index=False)


In [21]:
jsonFilePath = r'university_ranking.json'
csvFileName = r'converted_from_JSON.csv'
json_to_csv(jsonFilePath, csvFileName)

#### Function to load JSON and insert into SQL

In [10]:
db_path = "univ.db"
wd =os.getcwd()
json_file_path = '%s/%s' % (wd, 'university_ranking.json')

try:
    with open(json_file_path, encoding="utf8") as data:
        json_file=json.load(data)
    print("Successfully loaded JSON file")
except IOError as e:
    print (e)
    print("Error: can not open the file")

Successfully loaded JSON file


In [11]:
def json_to_sql(json_file, db_path):
    """
    Purpose: Convert JSON file to Pandas Dataframe then insert file 
             into the database
        
    Inputs:
        json_file   list
        db_path     String
        
    Output: Print message of either the successful insertion or an error 
    """
    conn = sqlite3.connect(db_path)
    
    df = pd.DataFrame(json_file)
    cur = conn.cursor()
    try:
        df.to_sql("university", conn, if_exists= "replace", index = False)
        print("Successfully Inserted File into SQLite Table")
    except:
        print("Failed to Insert File into SQLite Table")
    cur.close()
    

In [12]:
json_to_sql(json_file, db_path)

Successfully Inserted File into SQLite Table


#### Test to See if Database is Created Correctly

In [13]:
def testSqliteTable():
    """
    Purpose: Test if the database, table, and data are created correctly
             It connects to the database from the given path and select 
             all from the university table then it prints the length of the
             records
             The second query gets table_info of the table and prints length
             of the columns
             This function also accounts for errors
             Upon successful queries, it closes connection and print message
    
    Input: None
    
    Output: Number of records(rows) in the table : 6482
            Number of columns in the table : 13
            Optional:
                Commented out parts print all 6482 records from the table
    """
    try:
        conn = sqlite3.connect('univ.db')
        cur = conn.cursor()

        query = """SELECT * from university"""
        cur.execute(query)
        records = cur.fetchall()
        print("Number of records(rows) are: ", len(records))
        #print("Printing each row")
        #for row in records:
        #    print("university: ", row[0])
        #    print("year: ", row[1])
        #    print("rank: ", row[2])
        #    print("score: ", row[3])
        #    print("country: ", row[4])
        #    print("city: ", row[5])
        #    print("region: ", row[6])
        #    print("type: ", row[7])
        #    print("research_output: ", row[8])
        #    print("student_faculty_ratio: ", row[9])
        #    print("international_students: ", row[10])
        #    print("size: ", row[11])
        #    print("faculty_count: ", row[12])
        #    print("\n")
        cur.execute("PRAGMA table_info(university)")
        col = cur.fetchall()
        print("Number of columns are: ",len(col))
        cur.close()

    except sqlite3.Error as error:
        print("Failed to read data from sqlite table", error)
    finally:
        if conn:
            conn.close()
            print("The SQLite connection is closed")

testSqliteTable()


Number of records(rows) are:  6482
Number of columns are:  13
The SQLite connection is closed


#### Updating the Table

In [14]:
def update_table(db_path, query):
    """
    Purpose: This function is to update the table with query 
             You pass in the database file path and a string query to do operations
             
    Inputs: 
        db_path     String
        query       String
        
    Output: None
            Message will be printed either the attempt succeeded or failed
    """
    try:
        conn = sqlite3.connect(db_path)
        cur = conn.cursor()
        print("Connected to the Database")

        cur.execute(query)
        conn.commit()
        print("Record updated successfully ")
        cur.close()

    except sqlite3.Error as error:
        print("Failed to update sqlite table", error)
    finally:
        if conn:
            conn.close()
            print("The connection is closed")

In [15]:
db_path = "univ.db"
drop_query = """
        alter table university drop column faculty_count
        """
update_table(db_path, drop_query)

Connected to the Database
Record updated successfully 
The connection is closed


#### Inspecting the Database After Updating Number of Columns

In [16]:
def finalTestTable():
    """
    Purpose: Test if the database, table, and data are created correctly
             It connects to the database from the given path and select 
             all from the university table then it prints the length of the
             records
             The second query gets table_info of the table and prints length
             of the columns
             This function also accounts for errors
             Upon successful queries, it closes connection and print message
    
    Input: None
    
    Output: Number of records(rows) in the table : 6482
            Number of columns in the table : 12
            Optional:
                Commented out parts print all 6482 records from the table
    """
    try:
        conn = sqlite3.connect('univ.db')
        cur = conn.cursor()

        query = """SELECT * from university"""
        cur.execute(query)
        records = cur.fetchall()
        print("Number of records(rows) are: ", len(records))
        
        cur.execute("PRAGMA table_info(university)")
        col = cur.fetchall()
        print("Number of columns are: ",len(col))
        cur.close()

    except sqlite3.Error as error:
        print("Failed to read data from sqlite table", error)
    finally:
        if conn:
            conn.close()
            print("The SQLite connection is closed")

finalTestTable()


Number of records(rows) are:  6482
Number of columns are:  12
The SQLite connection is closed
