BigQuery Data Ingest from GCS

In [1]:
from google.cloud import bigquery

In [2]:
client = bigquery.Client()

In [3]:
project_id = "e2e-pipeline-cs"

In [4]:
dataset_id = "retail_data_warehouse"

In [5]:
def get_table_id(table_name):
    return f"{project_id}.{dataset_id}.{table_name}"

In [6]:
def get_bigquery_schema(schema):
    return [bigquery.SchemaField(s["name"], s["type"]) for s in schema]

In [21]:
order_schema = [
    {"name": "order_id", "type": "INTEGER"},
    {"name": "order_date", "type": "TIMESTAMP"},
    {"name": "order_customer_id", "type": "INTEGER"},
    {"name": "order_status", "type": "STRING"},
]

In [50]:
order_item_schema = [
    {"name":"order_item_id", "type": "INTEGER"},
    {"name":"order_item_order_id", "type": "INTEGER"},
    {"name":"order_item_product_id", "type": "INTEGER"},
    {"name":"order_item_quantity", "type": "INTEGER"},
    {"name":"order_item_subtotal", "type": "FLOAT"},
    {"name":"order_item_product_price", "type": "FLOAT"},
]

In [51]:
product_schema = [
    {"name":"product_id", "type": "INTEGER"},
    {"name":"product_cateogry_id", "type": "INTEGER"},
    {"name":"product_name", "type": "STRING"},
    {"name":"product_description", "type": "STRING"},
    {"name":"product_price", "type": "FLOAT"},
    {"name":"product_image", "type": "STRING"},
]

In [53]:
table_schema = {
    "orders": order_schema,
    "order_items": order_item_schema,
    "products": product_schema
}

In [54]:
import time

In [56]:
for table_name, schema in table_schema.items():
    job_config = bigquery.LoadJobConfig(
        schema=get_bigquery_schema(schema),
    )

    load_job = client.load_table_from_uri(
        source_uris=f"gs://e2e-gcp-data-lake/retail_db/{table_name}/part-00000",
        destination=get_table_id(table_name),
        job_config=job_config
    )   

    load_job.result()

    time.sleep(1)

In [7]:
table = client.get_table(table=get_table_id("orders"))

In [8]:
table.num_rows

68883

BigQuery SQL

In [9]:
query_job = client.query(
    f"""
        SELECT *
        FROM `{get_table_id("orders")}`
        LIMIT 10
    """
)

In [10]:
table_columns = [schema.name for schema in table.schema]

In [11]:
table_rows = query_job.result()

In [12]:
from prettytable import PrettyTable

In [13]:
def get_query_result(columns, rows):
    result = PrettyTable()
    result.field_names = columns
    for row in rows:
        result.add_row(row.values())

    return result

In [14]:
print(get_query_result(table_columns, table_rows))

+----------+---------------------------+-------------------+--------------+
| order_id |         order_date        | order_customer_id | order_status |
+----------+---------------------------+-------------------+--------------+
|  42834   | 2014-04-16 00:00:00+00:00 |        4321       |    CLOSED    |
|  42851   | 2014-04-16 00:00:00+00:00 |        5972       |    CLOSED    |
|  42864   | 2014-04-16 00:00:00+00:00 |        6024       |    CLOSED    |
|  42888   | 2014-04-16 00:00:00+00:00 |        3712       |    CLOSED    |
|  42890   | 2014-04-16 00:00:00+00:00 |        1961       |    CLOSED    |
|  42896   | 2014-04-16 00:00:00+00:00 |        7705       |    CLOSED    |
|  42898   | 2014-04-16 00:00:00+00:00 |        7770       |    CLOSED    |
|  42899   | 2014-04-16 00:00:00+00:00 |        3119       |    CLOSED    |
|  42906   | 2014-04-16 00:00:00+00:00 |        6215       |    CLOSED    |
|  42913   | 2014-04-16 00:00:00+00:00 |       10692       |    CLOSED    |
+----------+

In [15]:
query_job = client.query(
    f"""
        SELECT 
            o.order_date,
            oi.order_item_product_id,
            round(sum(oi.order_item_subtotal), 2) AS revenue
        FROM 
            `{get_table_id("orders")}` AS o
            JOIN `{get_table_id("order_items")}` AS oi ON o.order_id = oi.order_item_order_id
        WHERE o.order_status IN ('COMPLETE', 'CLOSED')
        GROUP BY 1, 2
        ORDER BY 1, 3 DESC
    """
)

In [16]:
table_columns = ['order_date', 'product_id', 'revenue']

In [17]:
table_rows = query_job.result()

In [18]:
print(get_query_result(table_columns, table_rows))

+---------------------------+------------+----------+
|         order_date        | product_id | revenue  |
+---------------------------+------------+----------+
| 2013-07-25 00:00:00+00:00 |    1004    | 5599.72  |
| 2013-07-25 00:00:00+00:00 |    191     | 5099.49  |
| 2013-07-25 00:00:00+00:00 |    957     |  4499.7  |
| 2013-07-25 00:00:00+00:00 |    365     | 3359.44  |
| 2013-07-25 00:00:00+00:00 |    1073    | 2999.85  |
| 2013-07-25 00:00:00+00:00 |    1014    | 2798.88  |
| 2013-07-25 00:00:00+00:00 |    403     | 1949.85  |
| 2013-07-25 00:00:00+00:00 |    502     |  1650.0  |
| 2013-07-25 00:00:00+00:00 |    627     | 1079.73  |
| 2013-07-25 00:00:00+00:00 |    226     |  599.99  |
| 2013-07-25 00:00:00+00:00 |     24     |  319.96  |
| 2013-07-25 00:00:00+00:00 |    821     |  207.96  |
| 2013-07-25 00:00:00+00:00 |    625     |  199.99  |
| 2013-07-25 00:00:00+00:00 |    705     |  119.99  |
| 2013-07-25 00:00:00+00:00 |    572     |  119.97  |
| 2013-07-25 00:00:00+00:00 

Cummulative Aggregation

In [19]:
query_job = client.query(
    f"""
        WITH daily_revenue AS (
            SELECT o.order_date,
                round(sum(oi.order_item_subtotal), 2) AS revenue
            FROM `{get_table_id("orders")}`  AS o
                JOIN `{get_table_id("order_items")}`  AS oi ON o.order_id = oi.order_item_order_id
            WHERE o.order_status IN ('COMPLETE', 'CLOSED')
            GROUP BY 1
        ) SELECT format_date('%Y%m', order_date) AS order_month,
            order_date,
            revenue,
            round(sum(revenue) OVER (
                PARTITION BY format_date('%Y%m', order_date)
                ORDER BY order_date
            ), 2) AS revenue_cum
        FROM daily_revenue
        ORDER BY 2;
    """
)

In [20]:
table_columns = ['order_month', 'order_date', 'revenue', 'revenue_cum']

In [21]:
table_rows = query_job.result()

In [22]:
print(get_query_result(table_columns, table_rows))

+-------------+---------------------------+----------+-------------+
| order_month |         order_date        | revenue  | revenue_cum |
+-------------+---------------------------+----------+-------------+
|    201307   | 2013-07-25 00:00:00+00:00 | 31547.23 |   31547.23  |
|    201307   | 2013-07-26 00:00:00+00:00 | 54713.23 |   86260.46  |
|    201307   | 2013-07-27 00:00:00+00:00 | 48411.48 |  134671.94  |
|    201307   | 2013-07-28 00:00:00+00:00 | 35672.03 |  170343.97  |
|    201307   | 2013-07-29 00:00:00+00:00 | 54579.7  |  224923.67  |
|    201307   | 2013-07-30 00:00:00+00:00 | 49329.29 |  274252.96  |
|    201307   | 2013-07-31 00:00:00+00:00 | 59212.49 |  333465.45  |
|    201308   | 2013-08-01 00:00:00+00:00 | 49160.08 |   49160.08  |
|    201308   | 2013-08-02 00:00:00+00:00 | 50688.58 |   99848.66  |
|    201308   | 2013-08-03 00:00:00+00:00 | 43416.74 |   143265.4  |
|    201308   | 2013-08-04 00:00:00+00:00 | 35093.01 |  178358.41  |
|    201308   | 2013-08-05 00:00:0

Compute and Filter Based Ranks

In [23]:
query_job = client.query(
    f"""
        WITH daily_product_revenue AS (
            SELECT o.order_date,
                oi.order_item_product_id,
                round(sum(oi.order_item_subtotal), 2) AS revenue
            FROM `{get_table_id("orders")}` AS o
                JOIN `{get_table_id("order_items")}` AS oi ON o.order_id = oi.order_item_order_id
            WHERE o.order_status IN ('COMPLETE', 'CLOSED')
            GROUP BY 1, 2
        ) SELECT * FROM (
            SELECT 
                format_date('%Y%m', order_date) AS order_month,
                order_date,
                order_item_product_id,
                revenue,
                dense_rank() OVER (
                    PARTITION BY order_date
                    ORDER BY revenue DESC
                ) AS drank
            FROM daily_product_revenue
        ) WHERE drank <= 3  
        ORDER BY 2, 4 DESC
    """
)

In [24]:
table_columns = ['order_month', 'order_date', 'order_item_product_id', 'revenue', 'drank']

In [25]:
table_rows = query_job.result()

In [26]:
print(get_query_result(table_columns, table_rows))

+-------------+---------------------------+-----------------------+----------+-------+
| order_month |         order_date        | order_item_product_id | revenue  | drank |
+-------------+---------------------------+-----------------------+----------+-------+
|    201307   | 2013-07-25 00:00:00+00:00 |          1004         | 5599.72  |   1   |
|    201307   | 2013-07-25 00:00:00+00:00 |          191          | 5099.49  |   2   |
|    201307   | 2013-07-25 00:00:00+00:00 |          957          |  4499.7  |   3   |
|    201307   | 2013-07-26 00:00:00+00:00 |          1004         | 10799.46 |   1   |
|    201307   | 2013-07-26 00:00:00+00:00 |          365          | 7978.67  |   2   |
|    201307   | 2013-07-26 00:00:00+00:00 |          957          | 6899.54  |   3   |
|    201307   | 2013-07-27 00:00:00+00:00 |          1004         | 9599.52  |   1   |
|    201307   | 2013-07-27 00:00:00+00:00 |          191          |  5999.4  |   2   |
|    201307   | 2013-07-27 00:00:00+00:00 |

Integration with Pandas


In [27]:
import pandas as pd

In [28]:
query = f"""
    SELECT order_status, count(*) AS order_count
    FROM `{get_table_id("orders")}` 
    GROUP BY 1
    ORDER BY 2 DESC
"""

In [29]:
df = pd.read_gbq(query, project_id=project_id)

In [30]:
df.head()

Unnamed: 0,order_status,order_count
0,COMPLETE,22899
1,PENDING_PAYMENT,15030
2,PROCESSING,8275
3,PENDING,7610
4,CLOSED,7556


External Tables

Integration with GCS

CREATE EXTERNAL TABLE retail_data_warehouse.orders (
    order_id INTEGER,
    order_date TIMESTAMP,
    order_customer_id INTEGER,
    order_status STRING
) OPTIONS (
    format = 'CSV',
    uris = ['gs://e2e-gcp-data-lake/retail_db/orders/*']
);

Integration with Cloud SQL

Use cases:
1. Proof of concepts
2. Temporary dashboard
3. Troubleshooting data quality
4. Add hoc data analysis

SELECT * 
FROM EXTERNAL_QUERY(
    "project_id.cloud_sql_db",
    "SELECT order_date, count(*) AS order_count FROM orders GROUP BY 1 ORDER BY 2 DESC"
);