In [1]:
import pandas as pd
import requests
import io
import json

# Mage Data Loader

In [None]:
import io
import pandas as pd
import requests
import json
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@data_loader
def load_data_from_api(*args, **kwargs):
    """
    Template for loading data from API
    """
    url1 = 'https://storage.googleapis.com/uber-data-engineering-project-0912/nyc-2023-06-30-data.csv'
    url2 = 'https://storage.googleapis.com/uber-data-engineering-project-0912/taxi%2B_zone_lookup.csv'

    url_list = [url1, url2]

    data_dict = {}
    for url in url_list:
        response = requests.get(url)
        data = response.text
        data_dict[url_list.index(url)] = data
        data_json = json.dumps(data_dict, indent=4)


    return data_json


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'


# Mage Data Transformer

In [None]:
import pandas as pd
import json
import io
if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@transformer
def transform(data, *args, **kwargs):
    """
    Template code for a transformer block.

    Add more parameters to this function if this block has multiple parent blocks.
    There should be one parameter for each output variable from each parent block.

    Args:
        data: The output from the upstream parent block
        args: The output from any additional upstream blocks (if applicable)

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    # Specify your transformation logic here
    data_reload = json.loads(data)
    
    # load taxi_data into dataframe
    taxi_data = pd.read_csv(io.StringIO(data_reload['0']), sep=',')
    taxi_data.drop('Unnamed: 0', axis=1, inplace=True)
    taxi_data['tpep_pickup_datetime'] = pd.to_datetime(taxi_data['tpep_pickup_datetime'])
    taxi_data['tpep_dropoff_datetime'] = pd.to_datetime(taxi_data['tpep_dropoff_datetime'])

    # create rate_dim table
    rate_code_category = {
        1 : "Standard rate",
        2 : "JFK",
        3 : "Newark",
        4 : "Nassau or Westchester",
        5 : "Negotiated fare",
        6 : "Group ride"
    }

    rate_dim = taxi_data[['RatecodeID']].drop_duplicates().dropna().sort_values('RatecodeID').reset_index(drop=True)
    rate_dim['rate_id'] = rate_dim.index
    rate_dim['rate_category_name'] = rate_dim['RatecodeID'].map(rate_code_category)
    rate_dim = rate_dim[['rate_id','RatecodeID','rate_category_name']]
    

    # create store_and_fwd_dim table
    flag_category = {
        "Y" : "Store and forward trip",
        "N" : "Not a store and forward trip"
    }

    store_and_fwd_dim = taxi_data[['store_and_fwd_flag']].drop_duplicates().dropna().reset_index(drop=True)
    store_and_fwd_dim['flag_id'] = store_and_fwd_dim.index
    store_and_fwd_dim['flag_name'] = store_and_fwd_dim['store_and_fwd_flag'].map(flag_category)
    store_and_fwd_dim = store_and_fwd_dim[['flag_id','store_and_fwd_flag', 'flag_name']]


    # create payment_dim table
    payment_type = {
        1 : "Credit card",
        2 : "Cash",
        3 : "No charge",
        4 : "Dispute",
        5 : "Unknown",
        6 : "Voided trip"
    }

    payment_dim = taxi_data[['payment_type']].drop_duplicates().dropna().sort_values('payment_type').tail(5).reset_index(drop=True)
    payment_dim['payment_id'] = payment_dim.index
    payment_dim['payment_type_name'] = payment_dim['payment_type'].map(payment_type)
    payment_dim = payment_dim[['payment_id','payment_type','payment_type_name']]

    # create location_dim table
    location_dim = pd.read_csv(io.StringIO(data_reload['1']), sep=',')
    location_dim.drop('Unnamed: 0', axis=1, inplace=True)

    
    # return all dataframe by aggregating into a dictionary
    return {
        'taxi_data' : taxi_data.to_dict(orient="dict"),
        'rate_dim' : rate_dim.to_dict(orient="dict"),
        'store_and_fwd_dim' : store_and_fwd_dim.to_dict(orient="dict"),
        'payment_dim' : payment_dim.to_dict(orient="dict"),
        'location_dim' : location_dim.to_dict(orient="dict")
    }


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'


# Mage Data Exporter

In [None]:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_big_query(data, **kwargs) -> None:
    """
    Template for exporting data to a BigQuery warehouse.
    Specify your configuration settings in 'io_config.yaml'.

    Docs: https://docs.mage.ai/design/data-loading#bigquery
    """
    #table_id = "uber-data-project-398804.uber_data_project.taxi_data"
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    for key, value in data.items():
        table_id = key
        BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
            DataFrame(value),
            f"uber-data-project-398804.uber_data_project.{table_id}",
            if_exists='replace',  # Specify resolution policy if table name already exists
        )
