# Cloud Function for extraction and transformation of the data from the APIs

In [None]:
# Libraries imported
import json
import pandas as pd
import datetime as dt
from datetime import timedelta
import pandas_read_xml as pdx
from functools import reduce
from google.cloud import storage
from urllib.request import urlopen


#-----------------------------------Japan----------------------------------------------------------------------


def japan_api_to_gcs():

    def deep_get(dictionary, keys, default=None): # Function to get values from nested xml file
        return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary)

    # Calling the Japan API    
    
    current_date = str(dt.datetime.now().date() - timedelta(days=1)) + 'T00:00:00'

    df = pdx.read_xml(f'http://service.iris.edu/fdsnws/event/1/query?starttime={current_date}&orderby=time&format=xml&maxlat=45.540717058168504&minlon=129.20644141355123&maxlon=146.68542509079882&minlat=30.180889170292048')    
    
    json_obj = df.to_json()

    json_format = json.loads(json_obj)

    data = pd.json_normalize(json_format)


    if 'q:quakeml.0.eventParameters.event' in data.keys():


        df_events = data['q:quakeml.0.eventParameters.event']

        events = []
        for entry in df_events:
            for l in entry:
                events.append(l)

        events_values = {'type' : [], 
                        'place' : [], 
                        'time' : [],
                        'author' : [],
                        'latitude' : [],
                        'longitude' : [],
                        'depth' : [],
                        'mag' : [],
                        'magType' : []}
                        
        for event in events:
            events_values['type'].append(deep_get(event, 'type')) 
            events_values['place'].append(deep_get(event, 'description.text')) 
            events_values['time'].append(deep_get(event, 'origin.time.value')) 
            events_values['author'].append(deep_get(event, 'origin.creationInfo.author')) 
            events_values['latitude'].append(deep_get(event, 'origin.latitude.value')) 
            events_values['longitude'].append(deep_get(event, 'origin.longitude.value')) 
            events_values['depth'].append(deep_get(event, 'origin.depth.value')) 
            events_values['mag'].append(deep_get(event, 'magnitude.mag.value')) 
            events_values['magType'].append(deep_get(event, 'magnitude.type'))

        df_api_japan = pd.DataFrame(events_values)

    else:

        df_events = data[[
        'q:quakeml.0.eventParameters.event.description.text',
        'q:quakeml.0.eventParameters.event.origin.time.value',
        'q:quakeml.0.eventParameters.event.origin.latitude.value',
        'q:quakeml.0.eventParameters.event.origin.longitude.value',
        'q:quakeml.0.eventParameters.event.origin.depth.value',
        'q:quakeml.0.eventParameters.event.magnitude.mag.value',
        'q:quakeml.0.eventParameters.event.magnitude.type']]

        df_events.rename(columns={'q:quakeml.0.eventParameters.event.description.text' : 'place',
        'q:quakeml.0.eventParameters.event.origin.time.value' : 'time',
        'q:quakeml.0.eventParameters.event.origin.latitude.value' : 'latitude',
        'q:quakeml.0.eventParameters.event.origin.longitude.value' : 'longitude',
        'q:quakeml.0.eventParameters.event.origin.depth.value' : 'depth',
        'q:quakeml.0.eventParameters.event.magnitude.mag.value' : 'mag',
        'q:quakeml.0.eventParameters.event.magnitude.type' : 'magType'}, inplace=True)

        df_api_japan = df_events[['time', 'place', 'mag', 'magType', 'depth', 'latitude', 'longitude']] # Data extracted

    # Saving data to Cloud Storage

    current_time = str(dt.datetime.now().strftime('%Y_%m_%d_%H_%M_%S'))

    client = storage.Client(project='Seismic Alert System')
    bucket = client.get_bucket('seismic-data-bucket')
    blob = bucket.blob("APIs/" + f'api_japan_{current_time}.csv')
    blob.upload_from_string(df_api_japan.to_csv(index = False),content_type = 'csv')


# -------------------------------------------------------Transformation------------------------------------------------------------------------------------------------

    # Transforming the Japan data

    df_api_japan_t = df_api_japan
    
    df_api_japan_t['time'] = pd.to_datetime(df_api_japan_t['time'])#, unit='ns')

    df_api_japan_t.latitude = df_api_japan_t.latitude.astype(float)
    df_api_japan_t.longitude = df_api_japan_t.longitude.astype(float)
    df_api_japan_t.depth = df_api_japan_t.depth.astype(float)
    df_api_japan_t.mag = df_api_japan_t.mag.astype(float)

    df_api_japan_t = df_api_japan_t[['time', 'place', 'mag', 'magType', 'depth', 'latitude', 'longitude']]

    df_api_japan_t.place = df_api_japan_t.place.str.title()

   
    def correct_depth(depths): # Function to correct the depths values
        corrected_depths = []
        for e in depths:
            if len(str(e)) >= 8:
                corrected_depth = str(e)[:3]
                corrected_depths.append(corrected_depth)
            else:
                corrected_depth = str(e)[:2]
                corrected_depths.append(corrected_depth)
        return corrected_depths

    depths = df_api_japan_t.depth.to_list()

    corrected_depths = correct_depth(depths)

    df_api_japan_t.depth = corrected_depths

    # Saving the transformed data to the Cloud Storage

    client = storage.Client(project='Seismic Alert System')
    bucket = client.get_bucket('seismic-data-bucket')
    blob = bucket.blob("Transformed_Data/" + f'api_japan_{current_time}.csv')
    blob.upload_from_string(df_api_japan_t.to_csv(index = False),content_type = 'csv')


#-------------------------------------------------Chile---------------------------------------------------------------------


# Calling the Chile API  

def chile_api_to_cgs():
    url = 'https://chilealerta.com/api/query/?user=demo&select=ultimos_sismos&limit=100&country=Chile'
    json_obj = urlopen(url)
    data = json.load(json_obj)
    df_api_chile = pd.json_normalize(data, record_path=['ultimos_sismos_Chile'])

    # Saving the data to the Cloud Storage

    current_time = str(dt.datetime.now().strftime('%Y_%m_%d_%H_%M_%S'))

    client = storage.Client(project='Seismic Alert System')
    bucket = client.get_bucket('seismic-data-bucket')
    blob = bucket.blob("APIs/" + f'api_chile_{current_time}.csv')
    blob.upload_from_string(df_api_chile.to_csv(index = False),content_type = 'csv')

    #-----------------------------------Transformation----------------------------------------------------------------------

    # Transforming the Chile data

    df_api_chile_t = df_api_chile

    df_api_chile_t.drop(columns=['state', 'local_time', 'chilean_time', 'id', 'url', 'source'], inplace=True)

    df_api_chile_t.rename(columns={'utc_time' : 'time',
                             'reference' : 'place',
                             'magnitude' : 'mag',
                             'scale' : 'magType'}, inplace=True)

    df_api_chile_t.time = pd.to_datetime(df_api_chile_t.time)

    df_api_chile_t = df_api_chile_t[['time', 'place', 'mag', 'magType', 'depth', 'latitude', 'longitude']] # Transformed data

    # Saving the transformed data to the Cloud Storage

    client = storage.Client(project='Seismic Alert System')
    bucket = client.get_bucket('seismic-data-bucket')
    blob = bucket.blob("Transformed_Data/" + f'api_chile_{current_time}.csv')
    blob.upload_from_string(df_api_chile_t.to_csv(index = False),content_type = 'csv')


#--------------------------------------------------US----------------------------------------------------------------------

# Calling the US API

def us_api_to_cgs():
    url = 'https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_hour.geojson'
    json_obj = urlopen(url)
    data = json.load(json_obj)

    df_us_api = pd.json_normalize(data, record_path=['features'])

    # Saving the data in the Cloud Storage

    current_time = str(dt.datetime.now().strftime('%Y_%m_%d_%H_%M_%S'))

    client = storage.Client(project='Seismic Alert System')
    bucket = client.get_bucket('seismic-data-bucket')
    blob = bucket.blob("APIs/" + f'api_us_{current_time}.csv')
    blob.upload_from_string(df_us_api.to_csv(index = False),content_type = 'csv')

    #-----------------------------------Transformation--------------------------------------------------------------------

    # Transforming the US data

    df_api_us_t = df_us_api[['properties.time', 'properties.place', 'properties.mag', 'properties.magType', 'geometry.coordinates']]

    df_api_us_t.rename(columns={'properties.time' : 'time',
                          'properties.place' : 'place',
                          'properties.mag' : 'mag',
                          'properties.magType' : 'magType',
                          'geometry.coordinates' : 'coordinates'}, inplace=True)

    df_us_locations = pd.DataFrame(df_api_us_t["coordinates"].to_list(), columns=['longitude', 'latitude', 'depth'])
    df_api_us_t['latitude'] = df_us_locations.latitude
    df_api_us_t['longitude'] = df_us_locations.longitude
    df_api_us_t['depth'] = df_us_locations.depth
    df_api_us_t.drop(columns='coordinates', inplace=True)

    df_api_us_t.time = pd.to_datetime(df_api_us_t.time, unit='ms')

    other_countries_locations = df_api_us_t[(((df_api_us_t.latitude < 20.911795455444313) & (df_api_us_t.latitude > 50.02924641916901)) & ((df_api_us_t.longitude < -124.65301770531302) & (df_api_us_t.longitude > -66.95789388605114))) | (df_api_us_t.place.str.contains('Japan|Russia|Canada|Mexico|Sea'))].index # I create the index refering to those entries
    df_api_us_t.drop(other_countries_locations, inplace=True)

    df_api_us_t = df_api_us_t[['time', 'place', 'mag', 'magType', 'depth', 'latitude', 'longitude']] # Transformed data

    # Saving the transformed data to the Cloud Storage

    client = storage.Client(project='Seismic Alert System')
    bucket = client.get_bucket('seismic-data-bucket')
    blob = bucket.blob("Transformed_Data/" + f'api_us_{current_time}.csv')
    blob.upload_from_string(df_api_us_t.to_csv(index = False),content_type = 'csv')

def main(data, context): # Function called when the cloud function triggers, calling all the functions inside
    japan_api_to_gcs()
    chile_api_to_cgs()
    us_api_to_cgs()