In [None]:
import pandas as pd
from metaflow import FlowSpec, step 
from sqlalchemy import create_engine, text as sa_text

engine = create_engine('postgresql://postgres:123@localhost:5432/taskDb')


class MetaflowETL(FlowSpec):

    @step
    def start(self):
    #Step 1
    #Extract data and Push raw data to PostgreSQL

        engine.execute(sa_text('''TRUNCATE TABLE airbnb_ny_2019_raw''').execution_options(autocommit=True))
        engine.execute(sa_text('''TRUNCATE TABLE airbnb_ny_2019_processed_stage''').execution_options(autocommit=True))
        df = pd.read_csv('AB_NYC_2019.csv')
        df.to_sql('airbnb_ny_2019_raw', engine) 
        self.next(self.transform)
    
    @step
    def transform(self):
    #Step 2
    #Read the Data from SQL to DataFrame 

        query = f'Select * from airbnb_ny_2019_raw'
        df=pd.read_sql_query(query,con=engine)

    #Normalize the data
    #-> Host Name is repeating for host id we can create a dimension table to reduce repetetion and perform join to view data when needed, we will remove host_name column from here
        dim_host_query = f'Select host_id, host_name from airbnb_ny_2019_raw group by host_id, host_name'
        dim_host = pd.read_sql_query(dim_host_query,con=engine)
        dim_host.to_sql('dim_host', engine)
        df = df.drop('host_name', axis = 1)

    #-> Neighbourhood_group is repeating, we can create dimension table to reduce the repetetion, we will remove neighbourhood_group from here
        dim_neighbourhood_query = f'Select neighbourhood, neighbourhood_group from airbnb_ny_2019_raw group by neighbourhood, neighbourhood_group'
        dim_neighbourhood = pd.read_sql_query(dim_neighbourhood_query,con=engine)
        dim_neighbourhood.to_sql('dim_neighbourhood', engine)
        df = df.drop('neighbourhood_group', axis = 1)

    #-> calculated_host_listings_count doesnt need to be stored as this can be calculated using the existing data, GROUP BY host_id to find
        df = df.drop('calculated_host_listings_count', axis = 1)

    #Handle the missing values with default values
        df['reviews_per_month'] = df['reviews_per_month'].fillna(0.0)
        df['last_review'] = df['last_review'].fillna('1990-01-01')

        self.next(self.viewMetrics(df))
    
    @step
    def viewMetrics(self, df):

    #Calculated Metrics

    #Average price per neighbourhood
        avg_price_n_query = f'SELECT neighbourhood, round(Sum(price)/count(*), 3) as "Average Price Per Neighbourhood" FROM public.airbnb_ny_2019_processed group by neighbourhood'
        avg_price_n = pd.read_sql_query(avg_price_n_query,con=engine)
        display(avg_price_n)

    #Average price per neighbouhood_group
        avg_price_ng_query = f'select a.neighbourhood, b.neighbourhood_group from airbnb_ny_2019_processed a left join dim_neighbourhood b on a.neighbourhood = b.neighbourhood group by a.neighbourhood, b.neighbourhood_group'
        avg_price_ng = pd.read_sql_query(avg_price_ng_query,con=engine)
        display(avg_price_ng)

    #Average price based on room_type
        avg_price_r_query = f'SELECT room_type, round(Sum(price)/count(*), 3) as "Average Price Per Room Type" FROM public.airbnb_ny_2019_processed group by room_type'
        avg_price_r = pd.read_sql_query(avg_price_r_query,con=engine)
        display(avg_price_r)

        self.next(self.end(df))
    
    @step
    def end(self, df):
        #Load into staging then into final, to eliminate the loss of data upon failure
        df.to_sql('airbnb_ny_2019_processed_stage', engine)
        
        engine.execute(sa_text('''
        BEGIN;
            DROP TABLE airbnb_ny_2019_processed;
            ALTER TABLE airbnb_ny_2019_processed_stage RENAME TO airbnb_ny_2019_processed;
        COMMIT;
        ''').execution_options(autocommit=True))

if __name__ == '__main__':
    MetaflowETL()
    