# ETL Project



In [1]:
import pandas as pd
import numpy as np
import requests
import psycopg2   

import warnings
warnings.filterwarnings("ignore")


## Getting data from JSON/API into one dataframe

In [2]:
import json

#Call openFDA api
#url = "https://api.fda.gov/food/event.json?search=date_started:[20040101+TO+20210101]&limit=1000"
#url_data = requests.get(url).json()
#fda_food_event_df = pd.DataFrame(url_data["results"])

#This dataset in available on openFDA website which contains all the records till date.It was downloaded on Jun 3,2021
# Opening JSON file
json_file = open("food-event-0001-of-0001.JSON",)

# returns JSON object as 
# a dictionary
report_data = json.load(json_file)
fda_food_event_df = pd.DataFrame(report_data["results"])

# Closing file
json_file.close()
    

In [3]:
fda_food_event_df

Unnamed: 0,report_number,outcomes,date_created,reactions,date_started,consumer,products
0,100176,[Other Outcome],20080206,"[OESOPHAGEAL INJURY, MOUTH INJURY, LACERATION]",20080130,{},"[{'role': 'SUSPECT', 'name_brand': 'SHIP AHOY ..."
1,101618,"[Medically Important, Patient Visited ER]",20080326,[CONSTIPATION],,"{'age': '73', 'age_unit': 'year(s)', 'gender':...","[{'role': 'SUSPECT', 'name_brand': 'CALTRATE 6..."
2,105036,"[Patient Visited Healthcare Provider, Medicall...",20080728,"[TRANSFUSION, SURGERY, INTESTINAL VILLI ATROPH...",,"{'age': '50', 'age_unit': 'year(s)', 'gender':...","[{'role': 'SUSPECT', 'name_brand': 'ONE A DAY ..."
3,103414,"[Patient Visited Healthcare Provider, Medicall...",20080521,"[RESPIRATORY DISORDER, ONYCHOMADESIS, NAIL INF...",20080216,"{'age': '51', 'age_unit': 'year(s)', 'gender':...","[{'role': 'SUSPECT', 'name_brand': 'TOTAL BODY..."
4,106170,[Other Outcome],20080911,"[INSOMNIA, DYSPNOEA, DYSPEPSIA]",20041007,{},"[{'role': 'SUSPECT', 'name_brand': 'HAAGEN DAZ..."
...,...,...,...,...,...,...,...
91771,93293,[Other Outcome],20070525,[MALAISE],,{},"[{'role': 'SUSPECT', 'name_brand': 'PETER PAN ..."
91772,94117,"[Patient Visited Healthcare Provider, Medicall...",20070627,"[WEIGHT DECREASED, VITAMIN B12 INCREASED, HEPA...",20061115,{},"[{'role': 'CONCOMITANT', 'name_brand': 'SCHIFF..."
91773,94832,"[Hospitalization, Patient Visited ER]",20070724,"[WHITE BLOOD CELL COUNT INCREASED, NIGHTMARE, ...",20070710,"{'age': '60', 'age_unit': 'year(s)', 'gender':...","[{'role': 'SUSPECT', 'name_brand': 'SCHIFF MEL..."
91774,98153,[Patient Visited Healthcare Provider],20071120,"[VISION BLURRED, SKIN LESION, SKIN EXFOLIATION...",20071109,"{'age': '45', 'age_unit': 'year(s)', 'gender':...","[{'role': 'SUSPECT', 'name_brand': 'JAN MARINI..."


## Breaking the single dataframe into reactions, outcomes, products and event with consumer details

### report reactions dataframe

In [4]:
#Notice reaction column in the fda_food_event_df dataframe is list of values. To get into each row, using explode
#Creating another dataframe for report reactions
report_reactions_df = fda_food_event_df[["report_number","reactions"]]
report_reactions_df = report_reactions_df.explode("reactions")

#It was observed that there are some duplicate reaction entry for same report, hence deleting them.
report_reactions_df['reactions'] = report_reactions_df['reactions'].str.title()
report_reactions_df = report_reactions_df.dropna()
report_reactions_df = report_reactions_df.drop_duplicates()
report_reactions_df

Unnamed: 0,report_number,reactions
0,100176,Oesophageal Injury
0,100176,Mouth Injury
0,100176,Laceration
1,101618,Constipation
2,105036,Transfusion
...,...,...
91774,98153,Eye Irritation
91774,98153,Eye Inflammation
91774,98153,Erythema Of Eyelid
91774,98153,Conjunctival Hyperaemia


### report outcomes dataframe

In [5]:
#Notice outcomes column in the fda_food_event_df dataframe is list of values. To get into each row, using explode
#Creating another dataframe for reprt outcomes
report_outcomes_df = fda_food_event_df[["report_number","outcomes"]]
report_outcomes_df = report_outcomes_df.explode("outcomes")
report_outcomes_df

Unnamed: 0,report_number,outcomes
0,100176,Other Outcome
1,101618,Medically Important
1,101618,Patient Visited ER
2,105036,Patient Visited Healthcare Provider
2,105036,Medically Important
...,...,...
91773,94832,Hospitalization
91773,94832,Patient Visited ER
91774,98153,Patient Visited Healthcare Provider
91775,98410,Hospitalization


### report products dataframe

In [6]:
#Notice products column in the fda_food_event_df dataframe is list of dictionary. To get into each key into column and value in row,
#using explode on products column

#Creating another dataframe for products
report_products_df = fda_food_event_df[["report_number","products"]]
report_products_df = report_products_df.explode("products")
products_df = pd.DataFrame(report_products_df['products'].tolist())

#reset index to concatenate on report products df and product df
report_products_df = report_products_df.reset_index()
report_products_df = pd.concat([report_products_df,products_df],axis=1)

#drop column products that came from original dataset and index after reset_index()
report_products_df.drop(['products','index'],axis=1,inplace=True)

#Replace new line characters
report_products_df['name_brand'] = report_products_df['name_brand'].str.replace("\\n",'').replace("\n",'')
report_products_df['name_brand'] = report_products_df['name_brand'].str.title()
report_products_df['role'] = report_products_df['role'].str.title()
report_products_df.drop_duplicates()

Unnamed: 0,report_number,role,name_brand,industry_code,industry_name
0,100176,Suspect,Ship Ahoy Usa Wild Alaskan Salmon,16,Fishery/Seafood Prod
1,101618,Suspect,Caltrate 600+ D Plus Mineral Calcium Supplement,54,Vit/Min/Prot/Unconv Diet(Human/Animal)
2,101618,Concomitant,Cod-Liver(Cod-Liver Oil),54,Vit/Min/Prot/Unconv Diet(Human/Animal)
3,101618,Concomitant,B100,54,Vit/Min/Prot/Unconv Diet(Human/Animal)
4,105036,Suspect,One A Day Women'S Tablet,54,Vit/Min/Prot/Unconv Diet(Human/Animal)
...,...,...,...,...,...
131256,94117,Concomitant,Theragram-M,54,Vit/Min/Prot/Unconv Diet(Human/Animal)
131257,94117,Concomitant,Flax Seed Oil,54,Vit/Min/Prot/Unconv Diet(Human/Animal)
131258,94832,Suspect,Schiff Melatonin Plus,54,Vit/Min/Prot/Unconv Diet(Human/Animal)
131259,98153,Suspect,Jan Marini Age Intervention Eyelash,53,Cosmetics


### consumer dataframe

In [7]:
#Notice that consumer column in main dataframe is dictionary. 
#To convert each key and value to column and row
consumer_df = fda_food_event_df["consumer"].tolist()
consumer_df = pd.DataFrame(consumer_df)
consumer_df

Unnamed: 0,age,age_unit,gender
0,,,
1,73,year(s),F
2,50,year(s),F
3,51,year(s),F
4,,,
...,...,...,...
91771,,,
91772,,,
91773,60,year(s),M
91774,45,year(s),F


### event report dataframe with consumer details

In [8]:
#Creating event report dataframe to join with consumer dataframe created in previous step.
event_report_df = fda_food_event_df[["report_number","date_created","date_started"]]
event_report_df = event_report_df.join(consumer_df)
event_report_df

Unnamed: 0,report_number,date_created,date_started,age,age_unit,gender
0,100176,20080206,20080130,,,
1,101618,20080326,,73,year(s),F
2,105036,20080728,,50,year(s),F
3,103414,20080521,20080216,51,year(s),F
4,106170,20080911,20041007,,,
...,...,...,...,...,...,...
91771,93293,20070525,,,,
91772,94117,20070627,20061115,,,
91773,94832,20070724,20070710,60,year(s),M
91774,98153,20071120,20071109,45,year(s),F


## Drop tables function

In [9]:
#Drop table script for dropping each table, if exists in FDA_food_event database
def drop_tables(cursor):
    """ drop tables in the PostgreSQL database"""
    drop_table_commands =  (
        """ DROP TABLE IF EXISTS report_product
        """,
        """
        DROP TABLE IF EXISTS report_reaction
        """,
        """
        DROP TABLE IF EXISTS report_outcome
        """,
        """
        DROP TABLE IF EXISTS event_report
        """ )
    # drop table one by one
    for drop_table_query in drop_table_commands:
        cursor.execute(drop_table_query)
        

## Create tables function

In [10]:
#Create table script for creating all the tables in FDA_food_event database
def create_tables(cursor):
    
    
    """ create tables in the PostgreSQL database"""
    create_table_commands =  (
        """
        CREATE TABLE event_report
        ( 
             report_number VARCHAR(25) PRIMARY KEY,
             date_created DATE NOT NULL,
             date_started DATE,
             consumer_age INT,
             consumer_age_unit VARCHAR(10),
             consumer_gender CHAR(2)
         )
        """,
        """
        CREATE TABLE report_product
         (
             report_number VARCHAR(25) not null,
             role VARCHAR(15),
             name_brand VARCHAR(255),
             industry_code CHAR(3),
             industry_name VARCHAR(100),
             FOREIGN KEY(report_number) REFERENCES event_report(report_number)
         )
        """,
        """
        CREATE TABLE report_reaction
         (   report_number VARCHAR(25) not null,
             reaction VARCHAR(75), 
             PRIMARY KEY(report_number,reaction),
             FOREIGN KEY(report_number) REFERENCES event_report(report_number)
         );
        """,
        """
        CREATE TABLE report_outcome
         (
             report_number VARCHAR(25) not null,
             outcome VARCHAR(75),
             PRIMARY KEY(report_number,outcome),
             FOREIGN KEY(report_number) REFERENCES event_report(report_number)
         );
        """
        )
     # create table one by one
    for create_table_query in create_table_commands:
        cursor.execute(create_table_query)
        

## Function to insert into tables using copy_from() file

In [11]:
#Copy data from .csv file to table in FDA_food_event database. Using '^' as separator for .csv file.
def copy_data_from_file(cursor, df, table):
    """
    Here we are going save the dataframe on disk as 
    a csv file, load the csv file and use copy_from() to copy it to the table
    """
    
    # Save the dataframe to disk
    tmp_df = "./tmp_" + table + ".csv"
    df.to_csv(tmp_df, index=False, header=False,sep='^')
    f = open(tmp_df, 'r')
    cursor.copy_from(f, table, sep="^",null='')
    

## Function to load FDA_food_event database

In [12]:
#to get database connection parameters like host,database,user and password
from config import config
def load_FDA_food_event_database(): 

    conn = None
    try:
        # read the connection parameters from database.ini and config
        params = config()
        
        # connect to the PostgreSQL server, using the parameters host, database,user and password
        conn = psycopg2.connect(**params)
        cursor = conn.cursor()
        
        #Set the client_min_messages paramter to 'ERROR' to avoid reporting NOTICE
        set_command = (""" SET client_min_messages='ERROR' """)
        cursor.execute(set_command)
                
        #Drop tables one by one
        drop_tables(cursor)
        
        #Create table one by one
        create_tables(cursor)
        
       
        #Insert into event_report table
        copy_data_from_file(cursor, event_report_df, 'event_report')
        #Insert into report_reaction table
        copy_data_from_file(cursor, report_reactions_df, 'report_reaction')
        #Insert into report_outcome table
        copy_data_from_file(cursor, report_outcomes_df, 'report_outcome')
        #insert into report_product table
        copy_data_from_file(cursor, report_products_df, 'report_product')
        
        # close communication with the PostgreSQL database server
        cursor.close()
        # commit the changes
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        conn.rollback()
        
    finally:
        if conn is not None:
            conn.close()
            cursor.close()


## Call function to load FDA_food_event database

In [13]:
#Load FDA_food_event database
load_FDA_food_event_database()