In [1]:
import os
import time
import json
import googlemaps
import numpy as np
import pandas as pd
from google.cloud import bigquery
from access_credentials.gcp_credentials import GOOGLE_MAPS_KEY

In [2]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r"access_credentials/rsp-dm-ii-dv-iii-elt-flow.json"

In [3]:
df = pd.read_excel('Datasets/RawDatasets/public_emdat_natural.xlsx', sheet_name='EM-DAT Data')

In [4]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2104 entries, 0 to 2103
Data columns (total 46 columns):
 #   Column                                     Non-Null Count  Dtype  
---  ------                                     --------------  -----  
 0   DisNo.                                     2104 non-null   object 
 1   Historic                                   2104 non-null   object 
 2   Classification Key                         2104 non-null   object 
 3   Disaster Group                             2104 non-null   object 
 4   Disaster Subgroup                          2104 non-null   object 
 5   Disaster Type                              2104 non-null   object 
 6   Disaster Subtype                           2104 non-null   object 
 7   External IDs                               213 non-null    object 
 8   Event Name                                 310 non-null    object 
 9   ISO                                        2104 non-null   object 
 10  Country                 

In [5]:
# Creating CSV for the CPI Table

cpi_list = list()
for year in df['Start Year'].unique():
    cpi_list.append({
        'CPICode': f'C_{year}',
        'Year': year,
        'Value': df[df['Start Year'] == year]['CPI'].values[0]
    })

cpi_df = pd.DataFrame(cpi_list)
cpi_df.sort_values('Year', inplace=True)
cpi_df.fillna(128.11, inplace=True)
cpi_df.to_csv('Datasets/CleanedDatasets/CPI.csv', sep='|', index=False, )

In [6]:
# Creating CSV for the Country Table
country_df = df[['ISO', 'Region', 'Subregion', 'Country']].drop_duplicates()
country_df.columns = ['ISOCode', 'Region', 'Subregion', 'Country']
country_df.sort_values('ISOCode', inplace=True)
country_df.to_csv('Datasets/CleanedDatasets/Country.csv', sep='|', index=False)

In [7]:
# Creating CSV for the ExternalReqRes Table

ext_req_res_list = []
for ofda in [0, 1]:
    for appeal in [0, 1]:
        for declaration in [0, 1]:
            ext_req_res_dict = {'ExternalReqResId': len(ext_req_res_list) + 1, 'OFDA': ofda, 'Appeal': appeal, 'Declaration': declaration}
            ext_req_res_list.append(ext_req_res_dict)

external_req_res_df = pd.DataFrame(ext_req_res_list)
external_req_res_df.sort_values('ExternalReqResId', inplace=True)
external_req_res_df.to_csv('Datasets/CleanedDatasets/ExternalReqRes.csv', sep='|', index=False)

In [8]:
# Creating CSV for DisasterClassification Table

disaster_classification_cols = ['Classification Key', 'Disaster Group', 'Disaster Subgroup', 'Disaster Type', 'Disaster Subtype', 'Magnitude Scale']
disaster_classification = df[disaster_classification_cols].drop_duplicates()
disaster_classification.columns = ['ClassificationKey', 'Group', 'Subgroup', 'Type', 'Subtype', 'Unit']
disaster_classification.sort_values('ClassificationKey', inplace=True)
disaster_classification.to_csv('Datasets/CleanedDatasets/DisasterClassification.csv', sep='|', index=False)

In [9]:
# Creating CSV for AssociateType Table

def replace_associate_type(assoc_types):
    if not isinstance(assoc_types, str):
        return assoc_types

    replacements = {
        'Snow/ice': 'Snow',
        'Avalanche (Snow, Debris)': 'Avalanche',
        'Broken Dam/Burst bank': 'Burst dam or bank',
        'Tsunami/Tidal wave': 'Tidal wave',
        'Slide (land, mud, snow, rock)': 'Land slide',
    }

    for old, new in replacements.items():
        assoc_types = assoc_types.replace(old, new)

    return assoc_types

df['Associated Types'] = df['Associated Types'].apply(replace_associate_type)

unique_associated_types = set()
for types in df['Associated Types'].unique():
    if isinstance(types, str):
        unique_associated_types.update(types.split("|"))

unique_associated_types = sorted(list(unique_associated_types))

associate_type_list = [{'AssociateTypeId': index + 1, 'AssociateType': assoc_type}
                      for index, assoc_type in enumerate(unique_associated_types)]
associate_type_df = pd.DataFrame(associate_type_list)
associate_type_df.sort_values('AssociateTypeId', inplace=True)
associate_type_df.to_csv('Datasets/CleanedDatasets/AssociateType.csv', sep='|', index=False)

# Creating CSV for DisasterAssociate Table (Which is a connection between Disaster and AssociateType Table)

associate_type_df = pd.read_csv('Datasets/CleanedDatasets/AssociateType.csv', sep='|')

disaster_associate_list = list()
for associate in associate_type_df.iterrows():
    associate_type_id = associate[1]['AssociateTypeId']
    associate_type = associate[1]['AssociateType']
    for row in df[~df['Associated Types'].isna()].iterrows():
        if associate_type in str(row[1]['Associated Types']):
            disaster_associate_dict = {
                'DisasterAssociateId': len(disaster_associate_list) + 1,
                'DisasterNo': row[1]['DisNo.'],
                'AssociateTypeId': associate_type_id,
            }

            disaster_associate_list.append(disaster_associate_dict)

disaster_associate = pd.DataFrame(disaster_associate_list)
disaster_associate.to_csv('Datasets/CleanedDatasets/DisasterAssociate.csv', sep='|', index=False)

In [10]:
# Adding Newly Found Locations into the df-dataframe.
location_dataframe = pd.read_csv('Datasets/IntermediateDatasets/location_filled.csv', sep=";")

merged_df = df.merge(location_dataframe[['DisNo.', 'Location']], on='DisNo.', how='left')
merged_df['Location'] = merged_df['Location_y'].combine_first(merged_df['Location_x'])
merged_df = merged_df.drop(['Location_x', 'Location_y'], axis=1)

df = df.merge(merged_df[['DisNo.', 'Location']], on='DisNo.', how='left')
df['Location'] = df['Location_x'].combine_first(df['Location_y'])
df = df.drop(['Location_x', 'Location_y'], axis=1)

# Using Google Geocoding API to find out the Lat and Lon of the Locations

gmaps = googlemaps.Client(key=GOOGLE_MAPS_KEY)

for row in df.iterrows():
    dis_no = row[1]['DisNo.']
    file_path = f'Datasets/IntermediateDatasets/GeocodedJsonFiles/{dis_no}.json'

    if not os.path.exists(file_path):
        try:
            geocode_result = gmaps.geocode(f"({row[1]['Location']}) in {row[1]['Country']}")
            with open(file_path, 'w') as file:
                file.write(json.dumps(geocode_result))
        except Exception as e:
            print(e)

# Creating CSV for AssociateType Table

location_details_list = list()
for row in df[~df.Location.isna()].iterrows():
    dis_no = row[1]['DisNo.']
    file_path = f'Datasets/IntermediateDatasets/GeocodedJsonFiles/{dis_no}.json'
    geo_coding = json.loads(open(file_path).read())

    for loc in geo_coding:
        location_dict = {
            'LocationID': len(location_details_list) + 1,
            'DisasterNo': dis_no,
            'Location': loc['formatted_address'],
            'Latitude': loc['geometry']['location']['lat'],
            'Longitude': loc['geometry']['location']['lng']
        }

        location_details_list.append(location_dict)

location_details_dataframe = pd.DataFrame(location_details_list)
location_details_dataframe.to_csv('Datasets/CleanedDatasets/Location.csv', sep='|', index=False)

In [11]:
ext_req_res_df = pd.read_csv('Datasets/CleanedDatasets/ExternalReqRes.csv', sep='|')

def fetch_ext_req_res_code(row):
    ofda_con = ext_req_res_df['OFDA'] == (row[1]['OFDA Response'] == 'Yes')
    appeal_con = ext_req_res_df['Appeal'] == (row[1]['Appeal'] == 'Yes')
    declaration_con = ext_req_res_df['Declaration'] == (row[1]['Declaration'] == 'Yes')
    ext_req_res_row = ext_req_res_df[ofda_con & appeal_con & declaration_con]
    
    return ext_req_res_row['ExternalReqResId'].values[0]

disaster_list = list()
for row in df.iterrows():
    disaster_list.append({
        'DisasterId': len(disaster_list) + 1,
        'DisasterNum': row[1]['DisNo.'],
        'ClassificationKey': row[1]['Classification Key'],
        'ISOCode': row[1]['ISO'],
        'ExternalReqResId': fetch_ext_req_res_code(row),
        'EventName': row[1]['Event Name'],
        'RiverBasin': row[1]['River Basin'],
        'DisasterOrigin': row[1]['Origin'],
        'DisasterMagnitude': row[1]['Magnitude'],
        'AidContribution': row[1]['AID Contribution (\'000 US$)'],
        'StartYear': row[1]['Start Year'],
        'StartMonth': row[1]['Start Month'],
        'StartDay': row[1]['Start Day'],
        'EndYear': row[1]['End Year'],
        'EndMonth': row[1]['End Month'],
        'EndDay': row[1]['End Day'],
        'TotalDeaths': row[1]['Total Deaths'],
        'NumInjured': row[1]['No. Injured'],
        'NumAffected': row[1]['No. Affected'],
        'NumHomeless': row[1]['No. Homeless'],
        'TotalAffected': row[1]['Total Affected'],
        'ReconstructionCost': row[1]['Reconstruction Costs (\'000 US$)'],
        'ReconstructionCostAdj': row[1]['Reconstruction Costs, Adjusted (\'000 US$)'],
        'InsuredDamage': row[1]['Insured Damage (\'000 US$)'],
        'InsuredDamageAdj': row[1]['Insured Damage, Adjusted (\'000 US$)'],
        'TotalDamage': row[1]['Total Damage (\'000 US$)'],
        'TotalDamageAdj': row[1]['Total Damage, Adjusted (\'000 US$)'],
        'CPICode': f'C_{row[1]["Start Year"]}',
        'EntryDate': row[1]['Entry Date'],
        'UpdatedDate': row[1]['Last Update']
    })

disaster_df = pd.DataFrame(disaster_list)
disaster_df.to_csv('Datasets/CleanedDatasets/Disaster.csv', sep='|', index=False)

In [12]:
user_input = input("To confirm re-creation of tables type -> RECREATE TABLES: ")

if user_input == "RECREATE TABLES":
    client = bigquery.Client()

    job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.CSV, skip_leading_rows=1, autodetect=True, 
                                        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE)
    
    dataset = None
    dataset_id = f"{client.project}.eu_disaster"
    
    try:
        dataset = client.get_dataset(dataset_id)
    except Exception as e:
        dataset = bigquery.Dataset(dataset_id)
        dataset = client.create_dataset(dataset, timeout=30)
    
    table_names = [file_name.split(".")[0] for file_name in os.listdir("Datasets/CleanedDatasets/")]
    
    for table in table_names:
        table_id = f"{dataset_id}.{table}"
        table_file_path = f"Datasets/CleanedDatasets/{table}.csv"
    
        with open(table_file_path, "rb") as source_file:
            job = client.load_table_from_file(source_file, table_id, job_config=job_config)

To confirm re-creation of tables type -> RECREATE TABLES:  
