SCHEMA PLAN (9 Tables in total)

sellers:
*   seller_id str 
*   seller_zip_code_prefix int
*   seller_city str
*   seller_state str

customers:
*   customer_id str
*   customer_unique_id str
*   customer_zip_code_prefix int
*   customer_city str
*   customer_state str

category_name:
*   product_category_name str
*   product_category_name_english str

products:
*   product_id str
*   product_category_name str
*   product_name_lenght int
*   product_description_lenght int
*   product_photos_qty int
*   product_weight_g int
*   product_length_cm int
*   product_height_cm int
*   product_width_cm int

orders:
*   order_id str
*   customer_id str
*   order_status str
*   order_purchase_timestamp str
*   order_approved_at str
*   order_delivered_carrier_date str
*   order_delivered_customer_date str
*   order_estimated_delivery_date str

order_items:
*   order_id str
*   order_item_id int
*   product_id str
*   seller_id str
*   shipping_limit_date str
*   price float
*   freight_value float

order_payments:
*   order_id str
*   payment_sequential int
*   payment_type str
*   payment_installments int
*   payment_value float

order_reviews:
*   review_id str
*   order_id str
*   review_score int
*   review_comment_title str
*   review_comment_message str
*   review_creation_date str
*   review_answer_timestamp str

geolocation:
*   geolocation_zip_code_prefix int
*   geolocation_lat str (see note below)
*   geolocation_lng str (see note below)
*   geolocation_city str
*   geolocation_state str

Note: A float or double is a binary value and does not have a well defined number of decimal places.

"Decimal places" have meaning only when you print the number.

To store latitude and longitude values and maintain full accuracy, store either the text string as received from the GPS unit, or store the long integer values maintained internally by TinyGPS. You will lose accuracy if you save the float value. -- from https://forum.arduino.cc/t/storing-latitute-and-longitude-using-a-double/670691/4 

In [None]:
### CREATE BQ TABLES - code section to create tables over at Google BigQuery with pre-defined schema

from google.cloud import bigquery
import pandas as pd

client = bigquery.Client(project='precise-victory-348205')

# Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

table_id = 'precise-victory-348205.olist.sellers'
schema = [
    bigquery.SchemaField("seller_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("seller_zip_code_prefix", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("seller_city", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("seller_state", "STRING", mode="REQUIRED"),
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))


table_id = 'precise-victory-348205.olist.customers'
schema = [
    bigquery.SchemaField("customer_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("customer_unique_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("customer_zip_code_prefix", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("customer_city", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("customer_state", "STRING", mode="REQUIRED"),
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))


table_id = 'precise-victory-348205.olist.category_name'
schema = [
    bigquery.SchemaField("product_category_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("product_category_name_english", "STRING", mode="REQUIRED"),
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))


table_id = 'precise-victory-348205.olist.products'
schema = [
    bigquery.SchemaField("product_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("product_category_name", "STRING"),
    bigquery.SchemaField("product_name_lenght", "INTEGER"),
    bigquery.SchemaField("product_description_lenght", "INTEGER"),
    bigquery.SchemaField("product_photos_qty", "INTEGER"),
    bigquery.SchemaField("product_weight_g", "INTEGER"),
    bigquery.SchemaField("product_length_cm", "INTEGER"),
    bigquery.SchemaField("product_height_cm", "INTEGER"),
    bigquery.SchemaField("product_width_cm", "INTEGER"),
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))


table_id = 'precise-victory-348205.olist.orders'
schema = [
    bigquery.SchemaField("order_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("customer_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("order_status", "STRING"),
    bigquery.SchemaField("order_purchase_timestamp", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("order_approved_at", "STRING"),
    bigquery.SchemaField("order_delivered_carrier_date", "STRING"),
    bigquery.SchemaField("order_delivered_customer_date", "STRING"),
    bigquery.SchemaField("order_estimated_delivery_date", "STRING"),    
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))


table_id = 'precise-victory-348205.olist.order_items'
schema = [
    bigquery.SchemaField("order_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("order_item_id", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("product_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("seller_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("shipping_limit_date", "STRING"),
    bigquery.SchemaField("price", "FLOAT"),
    bigquery.SchemaField("freight_value", "FLOAT"),
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))


table_id = 'precise-victory-348205.olist.order_payments'
schema = [
    bigquery.SchemaField("order_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("payment_sequential", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("payment_type", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("payment_installments", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("payment_value", "FLOAT"),
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))


table_id = 'precise-victory-348205.olist.order_reviews'
schema = [
    bigquery.SchemaField("review_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("order_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("review_score", "INTEGER"),
    bigquery.SchemaField("review_comment_title", "STRING"),
    bigquery.SchemaField("review_comment_message", "STRING"),
    bigquery.SchemaField("review_creation_date", "STRING"),
    bigquery.SchemaField("review_answer_timestamp", "STRING"),    
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))


table_id = 'precise-victory-348205.olist.geolocation'
schema = [
    bigquery.SchemaField("geolocation_zip_code_prefix", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("geolocation_lat", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("geolocation_lng", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("geolocation_city", "STRING"),
    bigquery.SchemaField("geolocation_state", "STRING"),
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))



In [None]:
### CSV to DF - load all CSV's hosted on Google Drive and store as DF

import pandas as pd
# Libraries to read csv file hosted on Google Drive into Google Colab:
!pip install python-tds
!pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
# Authenticate and create the PyDrive client.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# The shareable links from Google Drive for each CSV file hosted there
sellers_link = '1cG5h5AW0hE7OmDZ0YPtZLVSVDnL7mCkQ' 
category_name_link = '1Y1oiP5U1jBO0Ji5FJyRYKULw1T953obp'
products_link = '1L8a9t3obUYIH0RzCfaCcqil7rkjgHOb-'
order_reviews_link = '1_XXIOg3_rDSGKW61EI7MIZ0UqYYpLzfU'
orders_link = '1PaeE67IdrWocbY5isimVwezk7EQK5pAn'
order_payments_link = '1Tfrf7yof8pYXcK4pFuVRt28rWwW9BW4F'
order_items_link = '1IwOvsLbP6ub9BDsXq-FWbd3d7yl1ZJvI'
geolocation_link = '1yL1iSIwMjskL_kKza6kQH8TuI566lYnu'
customers_link = '10cfto60JXwwIe8lewY2dao3KB9TSOuu8'

# Read the Google Drive hosted CSVs into DF
downloaded = drive.CreateFile({'id':sellers_link}) 
downloaded.GetContentFile('olist_sellers_dataset.csv')  
dfsellers = pd.read_csv('olist_sellers_dataset.csv')

downloaded = drive.CreateFile({'id':category_name_link}) 
downloaded.GetContentFile('product_category_name_translation.csv')  
dfcategoryname = pd.read_csv('product_category_name_translation.csv')

downloaded = drive.CreateFile({'id':products_link}) 
downloaded.GetContentFile('olist_products_dataset.csv')  
dfproducts = pd.read_csv('olist_products_dataset.csv')

downloaded = drive.CreateFile({'id':order_reviews_link}) 
downloaded.GetContentFile('olist_order_reviews_dataset.csv')  
dforderreviews = pd.read_csv('olist_order_reviews_dataset.csv')

downloaded = drive.CreateFile({'id':orders_link}) 
downloaded.GetContentFile('olist_orders_dataset.csv')  
dforders = pd.read_csv('olist_orders_dataset.csv')

downloaded = drive.CreateFile({'id':order_payments_link}) 
downloaded.GetContentFile('olist_order_payments_dataset.csv')  
dforderpayments = pd.read_csv('olist_order_payments_dataset.csv')

downloaded = drive.CreateFile({'id':order_items_link}) 
downloaded.GetContentFile('olist_order_items_dataset.csv')  
dforderitems = pd.read_csv('olist_order_items_dataset.csv')

downloaded = drive.CreateFile({'id':geolocation_link}) 
downloaded.GetContentFile('olist_geolocation_dataset.csv')  
dfgeolocation = pd.read_csv('olist_geolocation_dataset.csv')

downloaded = drive.CreateFile({'id':customers_link}) 
downloaded.GetContentFile('olist_customers_dataset.csv')  
dfcustomers = pd.read_csv('olist_customers_dataset.csv')

# All Datasets are now stored in Pandas Dataframes


In [None]:
### FEED DFs to BigQuery

def df_to_BQ(dfname, tablename):

  table_id = "precise-victory-348205.olist."+tablename
  print("===> Loading DF to BQ table",table_id)
  job = client.load_table_from_dataframe(dfname, table_id)  # Make an API request.
  job.result()  # Wait for the job to complete.

  table = client.get_table(table_id)  # Make an API request.
  print("Loaded {} rows and {} columns to {}".format(table.num_rows, len(table.schema), table_id))


In [None]:
# execute function calls to load all data set (WARNING: do not re-execute as will result in duplicate data)

df_to_BQ(dfsellers, "sellers")               
df_to_BQ(dfcustomers, "customers")           
df_to_BQ(dfcategoryname, "category_name")    
df_to_BQ(dfproducts, "products")             
df_to_BQ(dforders, "orders")                 
df_to_BQ(dforderitems, "order_items")         
df_to_BQ(dforderpayments, "order_payments")  
df_to_BQ(dforderreviews, "order_reviews")
dfgeolocation['geolocation_lat'] = dfgeolocation['geolocation_lat'].astype(str)   # refer to notes for reason to force type to str
dfgeolocation['geolocation_lng'] = dfgeolocation['geolocation_lng'].astype(str)   # refer to notes for reason to force type to str
df_to_BQ(dfgeolocation, "geolocation")       


CLEANING REQUIRED (By Table)
*   OK sellers
*   OK customers
*   NOK geolocation (name odd chars)
*   NOK products (missing cat, name, description)
*   OK category_name (ignore english misspellings)
*   OK orders (some missing fields but intended due to delivery status)
*   OK order_items
*   NOK order_reviews (comment title and comment msg blanks n messy)
*   OK order_payments

In [None]:
# airflow trial
!pip install apache-airflow

In [None]:
!airflow db init
!airflow version

In [30]:
#!mkdir /root/airflow/dags
!airflow tasks test mingtest print 1

[[34m2022-04-27 11:51:54,408[0m] {[34mdagbag.py:[0m500} INFO[0m - Filling up the DagBag from [1m/root/airflow/dags[22m[0m
[[34m2022-04-27 11:51:54,828[0m] {[34mtaskinstance.py:[0m1043} INFO[0m - Dependencies all met for [1m<TaskInstance: mingtest.print None [None]>[22m[0m
[[34m2022-04-27 11:51:54,834[0m] {[34mtaskinstance.py:[0m1043} INFO[0m - Dependencies all met for [1m<TaskInstance: mingtest.print None [None]>[22m[0m
[[34m2022-04-27 11:51:54,835[0m] {[34mtaskinstance.py:[0m1249} INFO[0m - 
--------------------------------------------------------------------------------[0m
[[34m2022-04-27 11:51:54,835[0m] {[34mtaskinstance.py:[0m1250} INFO[0m - Starting attempt 1 of 2[0m
[[34m2022-04-27 11:51:54,835[0m] {[34mtaskinstance.py:[0m1251} INFO[0m - 
--------------------------------------------------------------------------------[0m
[[34m2022-04-27 11:51:54,837[0m] {[34mtaskinstance.py:[0m1270} INFO[0m - Executing [1m<Task(PythonOperator): pri