In [None]:
import json
import pandas as pd

import psycopg2
import psycopg2.extras as extras
from sqlalchemy import create_engine
import os 

ipaddress = os.environ['ipaddress']
dbname = os.environ['dbname']
username = os.environ['username']
password = os.environ['password']
port = os.environ['port']

ipaddress_dwh = os.environ['ipaddress_dwh']
dbname_dwh = os.environ['dbname_dwh']
username_dwh = os.environ['username_dwh']
password_dwh = os.environ['password_dwh']

def lambda_handler(event, context):
    
    # A long string that contains the necessary Postgres login information
    postgres_str = f'postgresql://{username}:{password}@{ipaddress}:{port}/{dbname}'
            
    # Create the connection
    cnx = create_engine(postgres_str)
    
    # load data from data lake
    data = cnx.execute('''
          SELECT
          *
          FROM tasty_clean
          ''')

    df = pd.DataFrame(data.fetchall(), columns = ['name', 'original_video_url','keywords','num_servings','total_time_minutes','yields', 'country',
               'id', 'prep_time_minutes', 'description', 'cook_time_minutes', 'nutrition_fiber', 'nutrition_protein',
               'nutrition_fat', 'nutrition_calories', 'nutrition_sugar', 'nutrition_carbohydrates',
               'user_ratings_count_positive', 'user_ratings_score', 'user_ratings_count_negative',
               'total_time_tier_display_tier', 'topics_clean', 'tags_clean', 'credits_clean', 'instructions_clean',
               'ingredients'])
    
    # delete duplicates
    df = df.drop_duplicates(subset=['name'])
    
    #create table for ingredients counts
    df_ingredients_counts = ingredients_tags(df)
    
    #create table for tags counts
    df_tags_counts = table_tags(df)
    
    # A long string that contains the necessary Postgres login information
    postgres_str_dwh = f'postgresql://{username_dwh}:{password_dwh}@{ipaddress_dwh}:{port}/{dbname_dwh}'
            
    # Create the connection
    cnx = create_engine(postgres_str_dwh)
    
    # load table to data warehouse
    df.to_sql('tasty_clean', con=cnx, index=False, if_exists='replace')
    df_ingredients_counts.to_sql('ingredients_counts', con=cnx, index=False, if_exists='replace')
    df_tags_counts.to_sql('tags_counts', con=cnx, index=False, if_exists='replace')
    

# create table ingredients_counts
def ingredients_tags(df):
    
    df['ingredients'] = df['ingredients'].str.replace("{","")
    df['ingredients'] = df['ingredients'].str.replace("}","")
    df['ingredients'] = df['ingredients'].str.replace('"',"")
    df['ingredients'] = df['ingredients'].str.replace('"',"")
    
    df['ingredients'] = df['ingredients'].apply(lambda x: x.split(","))
    df_ingredients_dummies = df['ingredients'].str.join('|').str.get_dummies()
                 
    # create new df with counts             
    ingredients_counts = df_ingredients_dummies.sum().sort_values(ascending=False)
    df_ingredients_counts = ingredients_counts.to_frame(name='count')
    df_ingredients_counts = df_ingredients_counts.reset_index()
    df_ingredients_counts = df_ingredients_counts.rename(columns={"index": "ingredient"})
    
    # only ingredients which are used more than 5 times
    df_ingredients_counts2 = df_ingredients_counts.loc[df_ingredients_counts['count'] > 5]
    
    return df_ingredients_counts2
    
def table_tags(df):
    
    df['tags_clean'] = df['tags_clean'].str.replace("{","")
    df['tags_clean'] = df['tags_clean'].str.replace("}","")
    df['tags_clean'] = df['tags_clean'].str.replace('"',"")
    df['tags_clean'] = df['tags_clean'].str.replace('"',"")
    
    df['tags_clean'] = df['tags_clean'].apply(lambda x: x.split(","))
    df_tags_dummies = df['tags_clean'].str.join('|').str.get_dummies()
    
    df_tags_dummies = pd.merge(df[['id']],
                 df_tags_dummies,
                 left_index=True, right_index=True,         
                 how='left')
                 
    # create new df with counts             
    tags_counts = df_tags_dummies.sum().sort_values(ascending=False)
    df_tags_counts = tags_counts.to_frame(name='count')
    df_tags_counts = df_tags_counts.reset_index()
    df_tags_counts = df_tags_counts.rename(columns={"index": "tag"})
    
    return df_tags_counts