In [176]:
import psycopg2
import logging
import os
import pandas as pd

# Get the directory where the script is located
script_dir = os.getcwd()

# Define the path to the logs folder
logs_dir = os.path.join(script_dir, 'logs')

# Ensure the logs directory exists
if not os.path.exists(logs_dir):
    os.makedirs(logs_dir)

# Configure logging to store logs in the logs folder
logging.basicConfig(
    filename=os.path.join(logs_dir, 'load.log'),
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s'
)


# Connecting to PostgreSQL database
###################################
def connect_db(dbname):
    """
    Connects to the PostgreSQL database.
    Returns a connection object if successful, or None if connection fails.
    """
    try:
        conn = psycopg2.connect(
            dbname=os.getenv('DB_NAME', dbname),
            user=os.getenv('DB_USER', 'postgres'),
            password=os.getenv('DB_PASSWORD', 'Michel2003'),
            host=os.getenv('DB_HOST', 'localhost'),
            port=os.getenv('DB_PORT', 5432)
        )
        logging.info("Connected to database successfully")
        return conn
    except Exception as error:
        logging.error(f"Error connecting to the database: {error}")
        return None


def create_disaster_tables(conn):
    try:
        cur = conn.cursor()
        # dim_disaster_groups
        try:
            logging.info("Creating table: dim_disaster_groups")
            cur.execute("""--sql
            CREATE TABLE IF NOT EXISTS dim_disaster_groups (
                group_id INT PRIMARY KEY,
                group_name VARCHAR,
                parent_group_id INT REFERENCES dim_disaster_groups(group_id)
            );
            """)
            logging.info("Table dim_disaster_groups created successfully.")
        except Exception as e:
            logging.error(f"Error creating dim_disaster_groups: {e}")

        # dim_disaster_types
        try:
            logging.info("Creating table: dim_disaster_types")
            cur.execute("""--sql
            CREATE TABLE IF NOT EXISTS dim_disaster_types (
                type_id INT PRIMARY KEY,
                type_name VARCHAR,
                parent_type_id INT REFERENCES dim_disaster_types(type_id)
            );
            """)
            logging.info("Table dim_disaster_types created successfully.")
        except Exception as e:
            logging.error(f"Error creating dim_disaster_types: {e}")

        # dim_disaster_names
        try:
            logging.info("Creating table: dim_disaster_names")
            cur.execute("""--sql
            CREATE TABLE IF NOT EXISTS dim_disaster_names (
                name_id INT PRIMARY KEY,
                name VARCHAR
            );
            """)
            logging.info("Table dim_disaster_names created successfully.")
        except Exception as e:
            logging.error(f"Error creating dim_disaster_names: {e}")

        # dim_locations
        try:
            logging.info("Creating table: dim_locations")
            cur.execute("""--sql
            CREATE TABLE IF NOT EXISTS dim_locations (
                location_id INT PRIMARY KEY,
                longitude DOUBLE PRECISION,
                latitude DOUBLE PRECISION,
                country VARCHAR,
                country_code VARCHAR,
                city VARCHAR,
                state VARCHAR
            );
            """)
            logging.info("Table dim_locations created successfully.")
        except Exception as e:
            logging.error(f"Error creating dim_locations: {e}")

        # dim_dates
        try:
            logging.info("Creating table: dim_dates")
            cur.execute("""--sql
            CREATE TABLE IF NOT EXISTS dim_dates (
                date_id INT PRIMARY KEY,
                disaster_date DATE
            );
            """)
            logging.info("Table dim_dates created successfully.")
        except Exception as e:
            logging.error(f"Error creating dim_dates: {e}")

        # dim_associated_distructions
        try:
            logging.info("Creating table: dim_associated_distructions")
            cur.execute("""--sql
            CREATE TABLE IF NOT EXISTS dim_associated_distructions (
                associated_dis_id INT PRIMARY KEY,
                associated_dis VARCHAR,
                parent_id INT REFERENCES dim_associated_distructions(associated_dis_id)
            );
            """)
            logging.info("Table dim_associated_distructions created successfully.")
        except Exception as e:
            logging.error(f"Error creating dim_associated_distructions: {e}")

        # dim_ofda_responses
        try:
            logging.info("Creating table: dim_ofda_responses")
            cur.execute("""--sql
            CREATE TABLE IF NOT EXISTS dim_ofda_responses (
                OFDA_resp_id INT PRIMARY KEY,
                OFDA_resp VARCHAR
            );
            """)
            logging.info("Table dim_ofda_responses created successfully.")
        except Exception as e:
            logging.error(f"Error creating dim_ofda_responses: {e}")

        # dim_appeals
        try:
            logging.info("Creating table: dim_appeals")
            cur.execute("""--sql
            CREATE TABLE IF NOT EXISTS dim_appeals (
                appeal_id INT PRIMARY KEY,
                appeal VARCHAR
            );
            """)
            logging.info("Table dim_appeals created successfully.")
        except Exception as e:
            logging.error(f"Error creating dim_appeals: {e}")

        # dim_declarations
        try:
            logging.info("Creating table: dim_declarations")
            cur.execute("""--sql
            CREATE TABLE IF NOT EXISTS dim_declarations (
                declaration_id INT PRIMARY KEY,
                declaration VARCHAR
            );
            """)
            logging.info("Table dim_declarations created successfully.")
        except Exception as e:
            logging.error(f"Error creating dim_declarations: {e}")

        # dim_mag_scales
        try:
            logging.info("Creating table: dim_mag_scales")
            cur.execute("""--sql
            CREATE TABLE IF NOT EXISTS dim_mag_scales (
                dis_mag_scale_id INT PRIMARY KEY,
                dis_mag_scalle VARCHAR
            );
            """)
            logging.info("Table dim_mag_scales created successfully.")
        except Exception as e:
            logging.error(f"Error creating dim_mag_scales: {e}")

        # dim_adm_levels
        try:
            logging.info("Creating table: dim_adm_levels")
            cur.execute("""--sql
            CREATE TABLE IF NOT EXISTS dim_adm_levels (
                adm_level_id INT PRIMARY KEY,
                adm_level INT
            );
            """)
            logging.info("Table dim_adm_levels created successfully.")
        except Exception as e:
            logging.error(f"Error creating dim_adm_levels: {e}")

        # fact_disasters
        try:
            logging.info("Creating table: fact_disasters")
            cur.execute("""--sql
            CREATE TABLE IF NOT EXISTS fact_disasters (
                disaster_id INT PRIMARY KEY,
                seq INT,
                glide VARCHAR,
                starting_date_id INT REFERENCES dim_dates(date_id),
                ending_date_id INT REFERENCES dim_dates(date_id),
                group_id INT REFERENCES dim_disaster_groups(group_id),
                type_id INT REFERENCES dim_disaster_types(type_id),
                name_id INT REFERENCES dim_disaster_names(name_id),
                location_id INT REFERENCES dim_locations(location_id),
                duration INT,
                origin VARCHAR,
                associated_dis_id INT REFERENCES dim_associated_distructions(associated_dis_id),
                OFDA_resp_id INT REFERENCES dim_ofda_responses(OFDA_resp_id),
                appeal_id INT REFERENCES dim_appeals(appeal_id),
                declaration_id INT REFERENCES dim_declarations(declaration_id),
                aid_contribution INT,
                dis_mag_value INT,
                dis_mag_scale_id INT REFERENCES dim_mag_scales(dis_mag_scale_id),
                total_deaths INT,
                no_injured INT,
                no_affected INT,
                no_homeless INT,
                total_affected INT,
                insured_damages DOUBLE PRECISION,
                total_damages DOUBLE PRECISION,
                cpi DOUBLE PRECISION,
                adm_level_id INT REFERENCES dim_adm_levels(adm_level_id)
            );
            """)
            logging.info("Table fact_disasters created successfully.")
        except Exception as e:
            logging.error(f"Error creating fact_disasters: {e}")

        conn.commit()
        logging.info("All tables created successfully.")

    except Exception as e:
        logging.error(f"General error during table creation: {e}")
        conn.rollback()

    finally:
        cur.close()

# Function to get data from PostgreSQL and load into a pandas DataFrame
#######################################################################
def get_data_from_db(query,conn):
    """
    Fetches data from the PostgreSQL database using the provided query.
    Cleans column names and returns the data as a pandas DataFrame.
    """
    #conn = connect_db()
    if conn is None:
        logging.error("Connection to database failed")
        return None
    
    try:
        df = pd.read_sql_query(query, conn)
        logging.info(f"Data fetched successfully for query: {query}")
        return df
    except Exception as error:
        logging.error(f"Error fetching data: {error}")
        return None
    finally:
        conn.close()

if __name__=="__main__":
    create_disaster_tables(connect_db('disasters_dwh'))
    disasters=get_data_from_db("""--sql
                     SELECT * FROM staging_disasters;""",connect_db('staging_disasters'))

  df = pd.read_sql_query(query, conn)


In [177]:
disasters.columns

Index(['Year', 'Seq', 'Glide', 'Disaster Group', 'Disaster Subgroup',
       'Disaster Type', 'Disaster Subtype', 'Disaster Subsubtype',
       'Event Name', 'Country', 'ISO', 'Region', 'Continent', 'Location',
       'Origin', 'Associated Dis', 'Associated Dis2', 'OFDA Response',
       'Appeal', 'Declaration', 'Aid Contribution', 'Dis Mag Value',
       'Dis Mag Scale', 'Latitude', 'Longitude', 'Local Time', 'River Basin',
       'Start Year', 'Start Month', 'Start Day', 'End Year', 'End Month',
       'End Day', 'Total Deaths', 'No Injured', 'No Affected', 'No Homeless',
       'Total Affected', 'Insured Damages ('000 US$)',
       'Total Damages ('000 US$)', 'CPI', 'Adm Level', 'Admin1 Code',
       'Admin2 Code', 'Geo Locations', 'extraction_time'],
      dtype='object')

In [178]:
# I need to create IDs for:
##### combine both columns: Disaster group and disaster subgroup, create Ids for them and create a new column called parent_id first (before combining the columns)  
##### do the same for the types and subtypes.
##### Create date table from min date till end date and assign it ID and then join it to the fact table
##### get Associated Dis ID and then combine it Associated Dis 2 and creat a parents ID
##### Event name ID
##### location ID for all the feilds
##### OFDA repsonse ID
##### Appeal ID
##### Declaration_ID
##### Dis_Mag_scale ID
##### adm level ID
##### origin ID

In [179]:
def create_hierarchy(df, level_cols, id_col_name):
    """
    Create a hierarchy of groups and subgroups with incremental IDs, and add the the Ids to the original DataFrame.
    """
    hierarchy = pd.DataFrame(columns=['id', 'name', 'parent_id'])
    current_id = 1
    
    parent_ids = {}  # A dictionary to keep track of parent IDs for each group level
    
    # Add a column for group IDs in the original DataFrame
    df[id_col_name] = None
    
    for i, level in enumerate(level_cols):
        # Get unique values for the current level
        unique_values = df[level].unique()
        
        for value in unique_values:
            # Find the parent ID (if it's not the first level)
            if i > 0:
                # Check if there is a matching parent in the previous level
                parent_row = df[df[level] == value]
                if len(parent_row) > 0:
                    parent_value = parent_row[level_cols[i-1]].values[0]
                    parent_id = parent_ids.get(parent_value, None)
                else:
                    parent_id = None  # No matching parent found
            else:
                parent_id = None  # First level has no parent
            
            # Append the group with its ID and parent ID to the hierarchy
            hierarchy = pd.concat([hierarchy, pd.DataFrame({
                'id': [current_id],
                'name': [value],
                'parent_id': [parent_id]
            })], ignore_index=True)
            
            # Store the current ID for the current level
            parent_ids[value] = current_id
            
            # Assign the current ID to the corresponding rows in the original DataFrame
            df.loc[df[level] == value, id_col_name] = current_id
            
            current_id += 1
    df = df.drop(columns=level_cols)
    return hierarchy, df

In [180]:
dim_disaster_types, disasters = create_hierarchy(disasters, ['Disaster Type', 'Disaster Subtype', 'Disaster Subsubtype'],'type_id')
dim_disaster_groups ,disasters= create_hierarchy(disasters,['Disaster Group', 'Disaster Subgroup'],'group_id')
dim_associated_distructions, disasters=create_hierarchy(disasters,['Associated Dis', 'Associated Dis2'],'associated_dis_id')

In [181]:
def create_incremental_ids(df, column_names, id_column_name='location_id'):
    """
    This function generates incremental IDs and can for unique combinations of values across multiple columns in a DataFrame.
    """
    unique_combinations = df[column_names].drop_duplicates().reset_index(drop=True)
    unique_combinations[id_column_name] = range(1, len(unique_combinations) + 1)
    df = pd.merge(df, unique_combinations, on=column_names, how='left')
    df = df.drop(columns=column_names)
    return df, unique_combinations

In [182]:
disasters, dim_locations=create_incremental_ids(disasters,['Country', 'ISO', 'Region', 'Continent', 'Location','Latitude','Longitude'],'location_id')
disasters, dim_disaster_names=create_incremental_ids(disasters,['Event Name'],'name_id')
disasters, dim_ofda_responses=create_incremental_ids(disasters,['OFDA Response'],'OFDA_resp_id')
disasters, dim_appeals=create_incremental_ids(disasters,['Appeal'],'appeal_id')
disasters, dim_declarations=create_incremental_ids(disasters,['Declaration'],'declaration_id')
disasters, dim_mag_scales=create_incremental_ids(disasters,['Dis Mag Scale'],'dis_mag_scale_id')
disasters, dim_adm_levels=create_incremental_ids(disasters,['Adm Level'],'adm_level_id')
disasters, dim_disasters_origin=create_incremental_ids(disasters,['Origin'],'origin_id')

In [183]:
disasters

Unnamed: 0,Year,Seq,Glide,Aid Contribution,Dis Mag Value,Local Time,River Basin,Start Year,Start Month,Start Day,...,group_id,associated_dis_id,location_id,name_id,OFDA_resp_id,appeal_id,declaration_id,dis_mag_scale_id,adm_level_id,origin_id
0,1902,12,,,8,20:20,,1902,4,18,...,2,1,1,1,1,1,1,1,1,1
1,1902,3,,,,,,1902,4,8,...,2,,2,2,1,1,1,2,1,1
2,1902,10,,,,,,1902,10,24,...,2,,2,2,1,1,1,2,1,1
3,1903,6,,,,,,1903,4,29,...,2,,3,1,1,1,1,2,1,1
4,1903,12,,,,,,1903,,,...,2,,4,3,1,2,2,2,1,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
32243,2021,449,FL-2021-000110,,,,,2021,7,16,...,4,,13533,1,1,1,1,4,1,6
32244,2021,75,,,,,,2021,2,1,...,4,,13534,1,1,1,1,4,4,6
32245,2021,599,EP-2021-000138,,,,,2021,9,7,...,5,,13535,1051,1,1,3,5,1,1
32246,2021,20,,,,,,2021,1,11,...,4,,13536,1,1,1,3,4,3,6


In [187]:
def generate_date_ids(df, start_date_col, end_date_col, id_name='date_id'):
    """
    Generates incremental IDs for all dates between the minimum and maximum dates 
    found in the starting_date and ending_date columns, updates the original DataFrame 
    to replace dates with their corresponding IDs, and creates a new DataFrame with 
    all unique dates and their IDs.
    """

    df[start_date_col] = pd.to_datetime(df[start_date_col])
    df[end_date_col] = pd.to_datetime(df[end_date_col])
    
    min_date = min(df[start_date_col].min(), df[end_date_col].min())
    max_date = max(df[start_date_col].max(), df[end_date_col].max())

    all_dates = pd.date_range(start=min_date, end=max_date, freq='D').to_frame(name='Date')

    date_dimension = all_dates.reset_index(drop=True)
    date_dimension[id_name] = range(1, len(date_dimension) + 1)
    
    df = pd.merge(df, date_dimension, left_on=start_date_col, right_on='Date', how='left')
    df = df.rename(columns={id_name: f'{start_date_col}_id'}).drop(columns='Date')
    
    df = pd.merge(df, date_dimension, left_on=end_date_col, right_on='Date', how='left')
    df = df.rename(columns={id_name: f'{end_date_col}_id'}).drop(columns='Date')
    
    return df, date_dimension

Updated DataFrame:
  starting_date ending_date  starting_date_id  ending_date_id
0    2021-01-15  2021-01-20                 1               6
1    2022-02-20  2022-02-25               402             407
2    2023-03-25  2023-04-01               800             807
3    2021-01-15  2021-01-25                 1              11

Date Dimension Table:
          Date  date_id
0   2021-01-15        1
1   2021-01-16        2
2   2021-01-17        3
3   2021-01-18        4
4   2021-01-19        5
..         ...      ...
802 2023-03-28      803
803 2023-03-29      804
804 2023-03-30      805
805 2023-03-31      806
806 2023-04-01      807

[807 rows x 2 columns]


In [188]:
dim_appeals

Unnamed: 0,Appeal,appeal_id
0,,1
1,No,2
2,Yes,3


In [189]:
dim_adm_levels

Unnamed: 0,Adm Level,adm_level_id
0,,1
1,2,2
2,1,3
3,1;2,4


In [190]:
dim_mag_scales

Unnamed: 0,Dis Mag Scale,dis_mag_scale_id
0,Richter,1
1,,2
2,Kph,3
3,Km2,4
4,Vaccinated,5
5,°C,6


In [191]:
dim_ofda_responses

Unnamed: 0,OFDA Response,OFDA_resp_id
0,,1
1,Yes,2
