# ETL MongoDB to BQ
We will create an ETL to transfer mongodb sample data from AirBnB to Bigquery
>Source: https://www.mongodb.com/developer/products/atlas/atlas-sample-datasets/

In [2]:
# import modin.pandas as pd
import pandas as pd
import hashlib
from pathlib import Path
import os
from datetime import datetime
from pymongo import MongoClient
# Set-up google credentials
import time
from google.cloud import bigquery
# Display
pd.set_option("display.max_columns", None)
pd.set_option("expand_frame_repr", False)
# Prefect
from prefect_gcp import GcpCredentials
from prefect import task, flow
from prefect.blocks.system import Secret
from prefect_gcp.cloud_storage import GcsBucket
print('Setup Complete')  

Setup Complete


## Extract Sample AirBnb data from MongoDB

## Function get_mongo-client

In [3]:
# Define a function to load the MongoClient
# Although the URI string for the Sample Dataset is public
# I will implement this code to obscure the access key.
@flow(log_prints=True, name="get-mongo-client")
def get_mongo_client():
    secret_block = Secret.load("sample-mongodb-uri")
    mongo_uri = secret_block.get()
    mongo_client = MongoClient(mongo_uri)
    return mongo_client

## Sample Datasets
>Extract `sample_airbnb` from mongodb sample_database

# Function extract-collection

In [4]:
# Define a function to extract all the documents inside the collection
@flow(log_prints=True, name="extract-collection-from-mongodb")
def extract_collection(db_name: str, coll_name: str) -> pd.DataFrame:
    mongo_client = get_mongo_client()
    db = mongo_client[db_name]
    db_coll = db.get_collection(coll_name)
    docs = db_coll.find({})
    df = pd.json_normalize(docs, sep="_")
    return df

# df_airbnb = extract_collection(db_name="sample_airbnb", coll_name="listingsAndReviews")
# df_airbnb.head()

Unnamed: 0,_id,listing_url,name,summary,space,description,neighborhood_overview,notes,transit,access,interaction,house_rules,property_type,room_type,bed_type,minimum_nights,maximum_nights,cancellation_policy,last_scraped,calendar_last_scraped,first_review,last_review,accommodates,bedrooms,beds,number_of_reviews,bathrooms,amenities,price,security_deposit,cleaning_fee,extra_people,guests_included,reviews,images_thumbnail_url,images_medium_url,images_picture_url,images_xl_picture_url,host_host_id,host_host_url,host_host_name,host_host_location,host_host_about,host_host_response_time,host_host_thumbnail_url,host_host_picture_url,host_host_neighbourhood,host_host_response_rate,host_host_is_superhost,host_host_has_profile_pic,host_host_identity_verified,host_host_listings_count,host_host_total_listings_count,host_host_verifications,address_street,address_suburb,address_government_area,address_market,address_country,address_country_code,address_location_type,address_location_coordinates,address_location_is_location_exact,availability_availability_30,availability_availability_60,availability_availability_90,availability_availability_365,review_scores_review_scores_accuracy,review_scores_review_scores_cleanliness,review_scores_review_scores_checkin,review_scores_review_scores_communication,review_scores_review_scores_location,review_scores_review_scores_value,review_scores_review_scores_rating,weekly_price,monthly_price,reviews_per_month
0,10006546,https://www.airbnb.com/rooms/10006546,Ribeira Charming Duplex,Fantastic duplex apartment with three bedrooms...,Privileged views of the Douro River and Ribeir...,Fantastic duplex apartment with three bedrooms...,"In the neighborhood of the river, you can find...",Lose yourself in the narrow streets and stairc...,Transport: • Metro station and S. Bento railwa...,We are always available to help guests. The ho...,"Cot - 10 € / night Dog - € 7,5 / night",Make the house your home...,House,Entire home/apt,Real Bed,2,30,moderate,2019-02-16 05:00:00,2019-02-16 05:00:00,2016-01-03 05:00:00,2019-01-20 05:00:00,8,3.0,5.0,51,1.0,"[TV, Cable TV, Wifi, Kitchen, Paid parking off...",80.0,200.0,35.0,15.0,6,"[{'_id': '58663741', 'date': 2016-01-03 05:00:...",,,https://a0.muscache.com/im/pictures/e83e702f-e...,,51399391,https://www.airbnb.com/users/show/51399391,Ana&Gonçalo,"Porto, Porto District, Portugal","Gostamos de passear, de viajar, de conhecer pe...",within an hour,https://a0.muscache.com/im/pictures/fab79f25-2...,https://a0.muscache.com/im/pictures/fab79f25-2...,,100.0,False,True,True,3,3,"[email, phone, reviews, jumio, offline_governm...","Porto, Porto, Portugal",,"Cedofeita, Ildefonso, Sé, Miragaia, Nicolau, V...",Porto,Portugal,PT,Point,"[-8.61308, 41.1413]",False,28,47,74,239,9.0,9.0,10.0,10.0,10.0,9.0,89.0,,,
1,10038496,https://www.airbnb.com/rooms/10038496,Copacabana Apartment Posto 6,"The Apartment has a living room, toilet, bedro...","The apartment has a living room, wash room, su...","The Apartment has a living room, toilet, bedro...",Copacabana in the South zone is the district t...,,On the street there is plenty of transport and...,todo o espaço.,Contact telephone numbers if needed: Valeria (...,Entreguem o imóvel conforme receberam e respei...,Apartment,Entire home/apt,Real Bed,3,75,strict_14_with_grace_period,2019-02-11 05:00:00,2019-02-11 05:00:00,2016-01-18 05:00:00,2019-01-28 05:00:00,4,1.0,3.0,70,2.0,"[TV, Cable TV, Internet, Wifi, Air conditionin...",119.0,600.0,150.0,40.0,3,"[{'_id': '60037020', 'date': 2016-01-18 05:00:...",,,https://a0.muscache.com/im/pictures/159d489e-6...,,51530266,https://www.airbnb.com/users/show/51530266,Ana Valéria,"Rio de Janeiro, State of Rio de Janeiro, Brazil",Professora de Educação Física formada pela uni...,within an hour,https://a0.muscache.com/im/pictures/8c7bb5fe-7...,https://a0.muscache.com/im/pictures/8c7bb5fe-7...,Copacabana,100.0,True,True,True,2,2,"[email, phone, reviews, jumio, government_id]","Rio de Janeiro, Rio de Janeiro, Brazil",Copacabana,Copacabana,Rio De Janeiro,Brazil,BR,Point,"[-43.190849194463404, -22.984339360067814]",False,7,19,33,118,10.0,10.0,10.0,10.0,10.0,10.0,98.0,,,
2,10066928,https://www.airbnb.com/rooms/10066928,3 chambres au coeur du Plateau,Notre appartement comporte 3 chambres avec cha...,"Notre logement est lumineux, plein de vie et c...",Notre appartement comporte 3 chambres avec cha...,"L'appartement se situe au coeur du Plateau, do...",,L'appartement se situe à égale distance des mé...,Le logement sera disponible en entier pour vot...,N'hésitez pas à m'écrire pour toute demande de...,Merci de respecter ce lieu de vie.,Apartment,Entire home/apt,Real Bed,1,1125,flexible,2019-03-11 04:00:00,2019-03-11 04:00:00,NaT,NaT,6,3.0,3.0,0,1.0,"[Internet, Wifi, Kitchen, Heating, Family/kid ...",140.0,,,0.0,1,[],,,https://a0.muscache.com/im/pictures/f208bdd7-b...,,9036477,https://www.airbnb.com/users/show/9036477,Margaux,"Montreal, Quebec, Canada",,,https://a0.muscache.com/im/users/9036477/profi...,https://a0.muscache.com/im/users/9036477/profi...,Le Plateau,,False,True,False,2,2,"[email, phone, reviews, work_email]","Montréal, Québec, Canada",Le Plateau-Mont-Royal,Le Plateau-Mont-Royal,Montreal,Canada,CA,Point,"[-73.57383, 45.52233]",True,0,0,0,0,,,,,,,,,,
3,10069642,https://www.airbnb.com/rooms/10069642,Ótimo Apto proximo Parque Olimpico,Apartamento próximo ao centro dos Jogos Olímpi...,,Apartamento próximo ao centro dos Jogos Olímpi...,,,,,,,Apartment,Entire home/apt,Real Bed,15,20,strict_14_with_grace_period,2019-02-11 05:00:00,2019-02-11 05:00:00,NaT,NaT,5,2.0,2.0,0,2.0,"[TV, Cable TV, Internet, Wifi, Air conditionin...",858.0,4476.0,112.0,75.0,1,[],,,https://a0.muscache.com/im/pictures/5b1f4beb-6...,,51670240,https://www.airbnb.com/users/show/51670240,Jonathan,"Resende, Rio de Janeiro, Brazil",,,https://a0.muscache.com/im/pictures/9a6839d9-9...,https://a0.muscache.com/im/pictures/9a6839d9-9...,,,False,True,False,1,1,"[email, phone, jumio, government_id]","Rio de Janeiro, Rio de Janeiro, Brazil",Recreio dos Bandeirantes,Recreio dos Bandeirantes,Rio De Janeiro,Brazil,BR,Point,"[-43.4311123147628, -23.00035792660916]",False,0,0,0,0,,,,,,,,,,
4,10021707,https://www.airbnb.com/rooms/10021707,Private Room in Bushwick,Here exists a very cozy room for rent in a sha...,,Here exists a very cozy room for rent in a sha...,,,,,,,Apartment,Private room,Real Bed,14,1125,flexible,2019-03-06 05:00:00,2019-03-06 05:00:00,2016-01-31 05:00:00,2016-01-31 05:00:00,1,1.0,1.0,1,1.5,"[Internet, Wifi, Air conditioning, Kitchen, Bu...",40.0,,,0.0,1,"[{'_id': '61050713', 'date': 2016-01-31 05:00:...",,,https://a0.muscache.com/im/pictures/72844c8c-f...,,11275734,https://www.airbnb.com/users/show/11275734,Josh,"New York, New York, United States",,,https://a0.muscache.com/im/users/11275734/prof...,https://a0.muscache.com/im/users/11275734/prof...,Bushwick,,False,True,True,1,1,"[email, phone, reviews, kba]","Brooklyn, NY, United States",Brooklyn,Bushwick,New York,United States,US,Point,"[-73.93615, 40.69791]",True,0,0,0,0,10.0,10.0,10.0,10.0,8.0,8.0,100.0,,,


In [None]:
# df_airbnb.loc[~df_airbnb["monthly_price"].isna()]

In [5]:
# df_airbnb.info(memory_usage="deep")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5555 entries, 0 to 5554
Data columns (total 77 columns):
 #   Column                                     Non-Null Count  Dtype         
---  ------                                     --------------  -----         
 0   _id                                        5555 non-null   object        
 1   listing_url                                5555 non-null   object        
 2   name                                       5555 non-null   object        
 3   summary                                    5555 non-null   object        
 4   space                                      5555 non-null   object        
 5   description                                5555 non-null   object        
 6   neighborhood_overview                      5555 non-null   object        
 7   notes                                      5555 non-null   object        
 8   transit                                    5555 non-null   object        
 9   access             

In [None]:
# df_airbnb.columns.to_list()

#### df-reviews

In [None]:
# # Exploring Reviews column
# df_reviews = pd.json_normalize(df_airbnb[["reviews"]].explode(column="reviews").reviews)
# df_reviews.head()

In [None]:
# df_reviews.info()

In [None]:
# df_airbnb.head(n=2)

# Function tweak-df

In [None]:
def tweak_df(df: pd.DataFrame) -> pd.DataFrame:
    # This is a challenge when doing pipeline, we are getting error
    # when uploading to GCS or BQ. Thats why we have to convert it.

    # This will convert Decimal128 to float and object to string
    for col in df.columns:
        if df[col].dtype == "object":
            df[col] = df[col].astype(str)
        elif pd.api.types.is_numeric_dtype(df[col]) and not pd.api.types.is_bool_dtype(
            df[col]
        ):
            df[col] = pd.to_numeric(df[col], errors="ignore", downcast="float")

    # This will convert float to int
    df["accommodates"] = df["accommodates"].astype(int)
    df["number_of_reviews"] = df["number_of_reviews"].astype(int)
    df["host_host_listings_count"] = df["host_host_listings_count"].astype(int)
    df["host_host_total_listings_count"] = df["host_host_total_listings_count"].astype(
        int
    )
    df["availability_availability_30"] = df["availability_availability_30"].astype(int)
    df["availability_availability_60"] = df["availability_availability_60"].astype(int)
    df["availability_availability_90"] = df["availability_availability_90"].astype(int)
    df["availability_availability_365"] = df["availability_availability_365"].astype(
        int
    )

    # This will convert object/string to float
    df["price"] = df["price"].astype(float)
    df["security_deposit"] = df["security_deposit"].astype(float)
    df["cleaning_fee"] = df["cleaning_fee"].astype(float)
    df["extra_people"] = df["extra_people"].astype(float)
    df["weekly_price"] = df["weekly_price"].astype(float)
    df["monthly_price"] = df["weekly_price"].astype(float)
    df["host_host_response_rate"] = df["host_host_response_rate"].astype(float)

    # Add a unique id and inserted at before uploading to GCS and BQ
    # We can use this as reference we run deduplicate
    df = df.assign(
        _record_hash=list(
            map(lambda x: hashlib.sha1(str(x).encode("utf-8")).hexdigest(), df["_id"])
        ),
        _bq_inserted_at=datetime.now(),
    )

    return df


# df_ = tweak_df(df_airbnb)
# df_.head()

#  amenities=pd.DataFrame(df["amenities"].tolist())


In [None]:
# df_.info()

# Function write-to-gcs

In [None]:
# Convert camelCase to split_case
def convert_to_split_case(text: str) -> str:
    mod_string = list(map(lambda x: '_' + x if x.isupper() else x, text))
    join_string = "".join(mod_string).lower().rstrip("_")
    return join_string

# coll_name="listingsAndReviews"
# convert_to_split_case(coll_name)

In [None]:
@task(log_prints=True, name="write-to-gcs", retries=3)
def write_to_gcs(df: pd.DataFrame, db_name: str, coll_name: str) -> None:
    directory = Path(f"{db_name}")
    converted_coll_name = convert_to_split_case(coll_name)
    path_name = directory / f"{converted_coll_name}.parquet.snappy"
    try:
        # directory.mkdir()
        os.makedirs(directory, exist_ok=True)
        gcs_block = GcsBucket.load("prefect-gcs-block-airbnb")
        gcs_block.upload_from_dataframe(
            df, to_path=path_name, serialization_format="parquet_snappy"
        )
    except OSError as error:
        print(error)

    print("Loaded data to GCS...Hooray!")
    return

# write_to_gcs(df_, db_name="sample_airbnb", coll_name="listingsAndReviews")


# write-gcs-to-bq

## Function write-to-bq (optional)

In [None]:
# # Define a function that will create the BQ dataset and create a blank Dataframe
# @flow(log_prints=True, name="write-to-bq")
# def write_to_bq(df, db_name: str, coll_name:str) -> None:
#     gcp_credentials_block = GcpCredentials.load("prefect-gcs-2023-creds")
#     converted_coll_name = convert_to_split_case(coll_name)
#     df.to_gbq(
#         destination_table=f"{db_name}.{converted_coll_name}",
#         project_id="dtc-de-2023",
#         credentials=gcp_credentials_block.get_credentials_from_service_account(),
#         if_exists="append",
#         location="asia-southeast1",
#         table_schema=[
#             {"name": "_id", "type": "STRING"},
#             {"name": "listing_url", "type": "STRING"},
#             {"name": "name", "type": "STRING"},
#             {"name": "summary", "type": "STRING"},
#             {"name": "space", "type": "STRING"},
#             {"name": "description", "type": "STRING"},
#             {"name": "neighborhood_overview", "type": "STRING"},
#             {"name": "notes", "type": "STRING"},
#             {"name": "transit", "type": "STRING"},
#             {"name": "access", "type": "STRING"},
#             {"name":  "interaction", "type": "STRING"},
#             {"name": "house_rules", "type": "STRING"},
#             {"name": "property_type", "type": "STRING"},
#             {"name": "room_type", "type": "STRING"},
#             {"name": "bed_type", "type": "STRING"},
#             {"name": "minimum_nights", "type": "INTEGER"},
#             {"name": "maximum_nights", "type": "INTEGER"},
#             {"name": "cancellation_policy", "type": "STRING"},
#             {"name": "last_scraped", "type": "TIMESTAMP"},
#             {"name": "calendar_last_scraped", "type": "TIMESTAMP"},
#             {"name": "first_review", "type": "TIMESTAMP"},
#             {"name": "last_review", "type": "TIMESTAMP"},
#             {"name": "accommodates", "type": "INTEGER"},
#             {"name": "bedrooms", "type": "FLOAT"},
#             {"name": "beds", "type": "FLOAT"},
#             {"name": "number_of_reviews", "type": "INTEGER"},
#             {"name": "bathrooms", "type": "INTEGER"},
#             {"name": "amenities", "type": "STRING"},
#             {"name": "price", "type": "FLOAT"},
#             {"name": "security_deposit", "type": "FLOAT"},
#             {"name": "cleaning_fee", "type": "FLOAT"},
#             {"name": "extra_people", "type": "INTEGER"},
#             {"name": "guests_included", "type": "INTEGER"},
#             {"name": "reviews", "type": "STRING"},
#             {"name": "images_thumbnail_url", "type": "STRING"},
#             {"name": "images_medium_url", "type": "STRING"},
#             {"name": "images_picture_url", "type": "STRING"},
#             {"name": "images_xl_picture_url", "type": "STRING"},
#             {"name": "host_host_id", "type": "STRING"},
#             {"name": "host_host_url", "type": "STRING"},
#             {"name": "host_host_name", "type": "STRING"},
#             {"name": "host_host_location", "type": "STRING"},
#             {"name": "host_host_about", "type": "STRING"},
#             {"name": "host_host_response_time", "type": "STRING"},
#             {"name": "host_host_thumbnail_url", "type": "STRING"},
#             {"name": "host_host_picture_url", "type": "STRING"},
#             {"name": "host_host_neighbourhood", "type": "STRING"},
#             {"name": "host_host_response_rate", "type": "FLOAT"},
#             {"name": "host_host_is_superhost", "type": "BOOLEAN"},
#             {"name": "host_host_has_profile_pic", "type": "BOOLEAN"},
#             {"name": "host_host_identity_verified", "type": "BOOLEAN"},
#             {"name": "host_host_listings_count", "type": "INTEGER"},
#             {"name": "host_host_total_listings_count", "type": "INTEGER"},
#             {"name": "host_host_verifications", "type": "STRING"},
#             {"name": "address_street", "type": "STRING"},
#             {"name": "address_suburb", "type": "STRING"},
#             {"name": "address_government_area", "type": "STRING"},
#             {"name": "address_market", "type": "STRING"},
#             {"name": "address_country", "type": "STRING"},
#             {"name": "address_country_code", "type": "STRING"},
#             {"name": "address_location_type", "type": "STRING"},
#             {"name": "address_location_coordinates", "type": "STRING"},
#             {"name": "address_location_is_location_exact", "type": "BOOLEAN"},
#             {"name": "availability_availability_30", "type": "INTEGER"},
#             {"name": "availability_availability_60", "type": "INTEGER"},
#             {"name": "availability_availability_90", "type": "INTEGER"},
#             {"name": "availability_availability_365", "type": "INTEGER"},
#             {"name": "review_scores_review_scores_accuracy", "type": "FLOAT"},
#             {"name": "review_scores_review_scores_cleanliness", "type": "FLOAT"},
#             {"name": "review_scores_review_scores_checkin", "type": "FLOAT"},
#             {"name": "review_scores_review_scores_communication", "type": "FLOAT"},
#             {"name": "review_scores_review_scores_location", "type": "FLOAT"},
#             {"name": "review_scores_review_scores_value", "type": "FLOAT"},
#             {"name": "review_scores_review_scores_rating", "type": "FLOAT"},
#             {"name": "weekly_price", "type": "FLOAT"},
#             {"name": "monthly_price", "type": "FLOAT"},
#             {"name": "reviews_per_month", "type": "FLOAT"},
#             {"name": "_record_hash", "type": "STRING"},
#             {"name": "_bq_inserted_at", "type": "TIMESTAMP"},
#         ],
#     )
#     return


# write_to_bq(df=df_, db_name="sample_airbnb", coll_name="listingsAndReviews")


## Function create-bq-dataset

In [None]:
# Define a function that will create the BQ dataset and create a blank Dataframe
@flow(log_prints=True, name="write-mongodb-to-bq")
def create_bq_dataset(db_name: str, coll_name: str) -> None:
    gcp_credentials_block = GcpCredentials.load("prefect-gcs-2023-creds")
    converted_coll_name = convert_to_split_case(coll_name)
    df = pd.DataFrame()
    df.to_gbq(
        destination_table=f"{db_name}.{converted_coll_name}",
        project_id="dtc-de-2023",
        credentials=gcp_credentials_block.get_credentials_from_service_account(),
        if_exists="append",
        location="asia-southeast1",
    )
    return

# create_dataset(db_name="sample_airbnb", coll_name="listingsAndReviews")

## Function get-biqquery-client

In [None]:
# Get bigquery_client
@task(log_prints=True, name="get-bigquery-client")
def get_bigquery_client():
    gcp_creds_block = GcpCredentials.load("prefect-gcs-2023-creds")
    gcp_creds = gcp_creds_block.get_credentials_from_service_account()
    client = bigquery.Client(credentials=gcp_creds)
    return client

## Function write-gcs-to-bq

In [None]:
# schema=[
#             bigquery.SchemaField("_id", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("listing_url", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("name", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("summary", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("space", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("neighborhood_overview", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("notes", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("transit", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("access", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("interaction", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("house_rules", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("property_type", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("room_type", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("bed_type", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("minimum_nights", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("maximum_nights", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("cancellation_policy", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("last_scraped", "TIMESTAMP", mode="NULLABLE"),
#             bigquery.SchemaField("calendar_last_scraped", "TIMESTAMP", mode="NULLABLE"),
#             bigquery.SchemaField("first_review", "TIMESTAMP", mode="NULLABLE"),
#             bigquery.SchemaField("last_review", "TIMESTAMP", mode="NULLABLE"),
#             bigquery.SchemaField("accommodates", "INTEGER", mode="NULLABLE"),
#             bigquery.SchemaField("bedrooms", "FLOAT", mode="NULLABLE"),
#             bigquery.SchemaField("beds", "FLOAT", mode="NULLABLE"),
#             bigquery.SchemaField("number_of_reviews", "INTEGER", mode="NULLABLE"),
#             bigquery.SchemaField("bathrooms", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("amenities", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("price", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("security_deposit", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("cleaning_fee", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("extra_people", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("guests_included", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("reviews", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("images_thumbnail_url", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("images_medium_url", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("images_picture_url", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("images_xl_picture_url", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("host_host_id", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("host_host_url", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("host_host_name", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("host_host_location", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("host_host_about", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("host_host_response_time", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("host_host_thumbnail_url", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("host_host_picture_url", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("host_host_neighbourhood", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("host_host_response_rate", "FLOAT", mode="NULLABLE"),
#             bigquery.SchemaField("host_host_is_superhost", "BOOLEAN", mode="NULLABLE"),
#             bigquery.SchemaField(
#                 "host_host_has_profile_pic", "BOOLEAN", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "host_host_identity_verified", "BOOLEAN", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "host_host_listings_count", "INTEGER", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "host_host_total_listings_count", "INTEGER", mode="NULLABLE"
#             ),
#             bigquery.SchemaField("host_host_verifications", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("address_street", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("address_suburb", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("address_government_area", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("address_market", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("address_country", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("address_country_code", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("address_location_type", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField(
#                 "address_location_coordinates", "STRING", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "address_location_is_location_exact", "BOOLEAN", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "availability_availability_30", "INTEGER", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "availability_availability_60", "INTEGER", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "availability_availability_90", "INTEGER", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "availability_availability_365", "INTEGER", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "review_scores_review_scores_accuracy", "FLOAT", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "review_scores_review_scores_cleanliness", "FLOAT", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "review_scores_review_scores_checkin", "FLOAT", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "review_scores_review_scores_communication", "FLOAT", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "review_scores_review_scores_location", "FLOAT", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "review_scores_review_scores_value", "FLOAT", mode="NULLABLE"
#             ),
#             bigquery.SchemaField(
#                 "review_scores_review_scores_rating", "FLOAT", mode="NULLABLE"
#             ),
#             bigquery.SchemaField("weekly_price", "FLOAT", mode="NULLABLE"),
#             bigquery.SchemaField("monthly_price", "FLOAT", mode="NULLABLE"),
#             bigquery.SchemaField("reviews_per_month", "FLOAT", mode="NULLABLE"),
#             bigquery.SchemaField("_record_hash", "STRING", mode="NULLABLE"),
#             bigquery.SchemaField("_bq_inserted_at", "TIMESTAMP", mode="NULLABLE"),
#         ]

In [None]:
# Upload data from GCS to BigQuery
@flow(log_prints=True, name="write-gcs-to-bq")
def write_gcs_to_bq(db_name: str, coll_name: str) -> None:
    client = get_bigquery_client()
    converted_coll_name = convert_to_split_case(coll_name)
    table_id = f"dtc-de-2023.{db_name}.{converted_coll_name}"

    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        autodetect=True,
    )
    # Dont forget to run TERRAFORM to create the bucket
    # OR create bucket in https://console.cloud.google.com/storage/create-bucket
    uri = f"gs://airbnb-gcs-bucket/{db_name}/{converted_coll_name}.parquet.snappy"
    load_job = client.load_table_from_uri(
        uri, table_id, job_config=job_config
    )  # Make an API request.

    load_job.result()  # Waits for the job to complete.

    destination_table = client.get_table(table_id)
    print(f"Loaded {destination_table.num_rows} rows.")


# write_gcs_to_bq(db_name="sample_airbnb", coll_name="listingsAndReviews")


## Function dedup

In [None]:
# Remove duplicates
@flow(log_prints=True, name="removing-duplicates-from-bq")
def deduplicate_data():
    client = get_bigquery_client()
    # this will remove the duplicates
    query_dedup = """ 
            -- CREATE A CTE TABLE
            CREATE OR REPLACE TABLE 
                `dtc-de-2023.sample_airbnb.listings_and_reviews` AS 
            WITH 
                CTE1 AS (
                SELECT 
                    *, 
                    ROW_NUMBER() OVER(
                                    PARTITION BY _id 
                                    ORDER BY _bq_inserted_at) AS latest_row
                FROM `dtc-de-2023.sample_airbnb.listings_and_reviews`)

            -- FETCH ONLY THE LATEST ROW WHICH IS THE LATEST BQ INSERTED TIMESTAMP
            SELECT * EXCEPT (latest_row)
            FROM CTE1
            WHERE latest_row = 1 
            """
    # limit query to 10GB or priority=bigquery.QueryPriority.BATCH
    safe_config = bigquery.QueryJobConfig(maximum_bytes_billed=10**10)
    
    # query
    query_job = client.query(query_dedup, job_config=safe_config)

    # Check progress
    while query_job.state == "RUNNING":
        query_job = client.get_job(query_job.job_id, location=query_job.location)
        time.sleep(1)

        print("Complete removing duplicates")
        print(f"Job {query_job.job_id} is currently in state {query_job.state}")
    return 

# deduplicate_data()

### Notes:
### In the case of the Airbnb dataset, you could partition the data by date or location, and then cluster the data within each partition based on columns such as the type of property, number of bedrooms, or amenities. This would allow you to quickly and easily analyze subsets of the data that are relevant to your specific analysis or query.

In [None]:
# Create a table with PARTITION AND CLUSTER
# Query
@flow(log_prints=True, name="create-partition-clustered-bq-table")
def create_partition_clustered_bq_table() -> None:
    print("Creating a separate partition and clustered table")
    client = get_bigquery_client()
    airbnb_part_clus = """
                        CREATE OR REPLACE TABLE 
                            `dtc-de-2023.sample_airbnb.listings_and_reviews_part_clust`
                        PARTITION BY
                            DATE(last_scraped)
                        CLUSTER BY
                            property_type AS (
                                SELECT *
                                FROM `dtc-de-2023.sample_airbnb.listings_and_reviews`
                            )
            """
    # Limit results to 1GB=10**10
    safe_config = bigquery.QueryJobConfig(maximum_bytes_billed=10**10)
    # Query
    results_part_clus = client.query(airbnb_part_clus, job_config=safe_config).result()
    print("Done creating partitioned and clustered table")
    return results_part_clus


# create_partition_clustered_bq_table()


# Main

In [None]:
# Main ETL flow to load MongodDB to BigQuery
@flow(log_prints=True, name="etl-mongodb-to-bq")
def etl_mongodb_to_bq(db_name: str, coll_name: str):
    # Extract listings and reviews
    df = extract_collection(db_name, coll_name)
    # Tweak df
    df_ = tweak_df(df)
    # Upload to GCS
    write_to_gcs(df_, db_name, coll_name)
    # Create BQ dataset
    create_bq_dataset(db_name, coll_name)
    # Upload to BigQuery
    write_gcs_to_bq(db_name, coll_name)
    # Remove duplicates by creating or replacing table and
    # using the latest _bq_inserted_at
    deduplicate_data()
    # Create a separate partition and clustered table
    create_partition_clustered_bq_table()

if __name__=="__main__":
    # Parameters
    db_name="sample_airbnb"
    coll_name="listingsAndReviews"
    # Run Main
    etl_mongodb_to_bq(db_name, coll_name)

# Others

In [None]:
# table_schema = [
# {"name": '_id', "type": str},
# {"name": 'listing_url', "type": str},
# {"name": 'name', "type": str},
# {"name": 'summary', "type": str},
# {"name": 'space', "type": str},
# {"name": 'description', "type": str},
# {"name": 'neighborhood_overview', "type": str},
# {"name": 'notes', "type": str},
# {"name": 'transit', "type": str},
# {"name": 'access', "type": str},
# {"name": 'interaction', "type": str},
# {"name": 'house_rules', "type": str},
# {"name": 'property_type', "type": str},
# {"name": 'room_type', "type": str},
# {"name": 'bed_type', "type": str},
# {"name": 'minimum_nights', "type": int},
# {"name": 'maximum_nights', "type": int},
# {"name": 'cancellation_policy', "type": str},
# {"name": 'last_scraped', "type": "datetime64[ns]"},
# {"name": 'calendar_last_scraped', "type": "datetime64[ns]"},
# {"name": 'first_review', "type": "datetime64[ns]"},
# {"name": 'last_review', "type": "datetime64[ns]"},
# {"name": 'accommodates', "type": int},
# {"name": 'bedrooms', "type": float},
# {"name": 'beds', "type": float},
# {"name": 'number_of_reviews', "type": int},
# {"name": 'bathrooms', "type": int},
# {"name": 'amenities', "type": str},
# {"name": 'price', "type": float},
# {"name": 'security_deposit', "type": float},
# {"name": 'cleaning_fee', "type": float},
# {"name": 'extra_people', "type": int},
# {"name": 'guests_included', "type": int},
# {"name": 'reviews', "type": str},
# {"name": 'images_thumbnail_url', "type": str},
# {"name": 'images_medium_url', "type": str},
# {"name": 'images_picture_url', "type": str},
# {"name": 'images_xl_picture_url', "type": str},
# {"name": 'host_host_id', "type": str},
# {"name": 'host_host_url', "type": str},
# {"name": 'host_host_name', "type": str},
# {"name": 'host_host_location', "type": str},
# {"name": 'host_host_about', "type": str},
# {"name": 'host_host_response_time', "type": str},
# {"name": 'host_host_thumbnail_url', "type": str},
# {"name": 'host_host_picture_url', "type": str},
# {"name": 'host_host_neighbourhood', "type": str},
# {"name": 'host_host_response_rate', "type": float},
# {"name": 'host_host_is_superhost', "type": bool},
# {"name": 'host_host_has_profile_pic', "type": bool},
# {"name": 'host_host_identity_verified', "type": bool},
# {"name": 'host_host_listings_count', "type": int},
# {"name": 'host_host_total_listings_count', "type": int},
# {"name": 'host_host_verifications', "type": str},
# {"name": 'address_street', "type": str},
# {"name": 'address_suburb', "type": str},
# {"name": 'address_government_area', "type": str},
# {"name": 'address_market', "type": str},
# {"name": 'address_country', "type": str},
# {"name": 'address_country_code', "type": str},
# {"name": 'address_location_type', "type": str},
# {"name": 'address_location_coordinates', "type": str},
# {"name": 'address_location_is_location_exact', "type": bool},
# {"name": 'availability_availability_30', "type": int},
# {"name": 'availability_availability_60', "type": int},
# {"name": 'availability_availability_90', "type": int},
# {"name": 'availability_availability_365', "type": int},
# {"name": 'review_scores_review_scores_accuracy', "type": float},
# {"name": 'review_scores_review_scores_cleanliness', "type": float},
# {"name": 'review_scores_review_scores_checkin', "type": float},
# {"name": 'review_scores_review_scores_communication', "type": float},
# {"name": 'review_scores_review_scores_location', "type": float},
# {"name": 'review_scores_review_scores_value', "type": float},
# {"name": 'review_scores_review_scores_rating', "type": float},
# {"name": 'weekly_price', "type": float},
# {"name": 'monthly_price', "type": float},
# {"name": 'reviews_per_month', "type": float},
# {"name": '_record_hash', "type": str},
# {"name": '_bq_inserted_at', "type": "datetime64[ns]"},]

# # print(table_schema)
