In [1]:
from pyspark.sql.utils import ParseException, AnalysisException
def run_sql(query):
    try:
        return spark.sql(query).toPandas().head()
    except (ParseException, AnalysisException) as ex:
        print(str(ex).replace('\\n', '\n')[2:-2])
    

In [2]:
import pandas as pd
pd.set_option('display.max_columns', 45)
pd.set_option('display.max_colwidth', -1)

### Identifying a 'Trusted' Order

First of all, we need to understand what should be considered a valid order, i.e. it's important to avoid saving corrupted and false orders (from changed apks, time travelers..) on the trusted dataset. Which could end up on missleading analysis.
Another option would be to add flags on each row, classifying on trusted/corrupted/time_traveler.. etc

That being said, a trusted order should respect the following constraints:

- Coherent flow:
    - REGISTERED > PLACED > CONCLUDED
    - REGISTERED > PLACED > CANCELLED
    - REGISTERED > CANCELLED

- Have at least a Order Status Attributed
    - OrderId must match on registration date between tables


### Matching Order with Order Status

In [3]:
query = """
WITH valid_orders AS (
    SELECT
        order_id,
        COALESCE(CAST(CAST(MAX(IF(value = 'REGISTERED', created_at, NULL)) AS DATE) AS STRING), '') AS registration_date
    FROM raw_layer.order_status
    GROUP BY order_id
),


all_orders AS (
    SELECT
        order_id,
        COALESCE(CAST(CAST(order_created_at AS DATE) AS STRING), '') AS registration_date
    FROM raw_layer.order
),

orders_validation AS (
    SELECT 
        all_orders.*,
        all_orders.order_id IS NOT NULL AND valid_orders.order_id IS NOT NULL AS registration_date_is_valid,
        locate('2018', all_orders.registration_date) <> 0 AS 2018_data
    FROM all_orders
    FULL OUTER JOIN valid_orders ON 
        all_orders.order_id = valid_orders.order_id AND
        all_orders.registration_date = valid_orders.registration_date
),

cross_check AS (
    SELECT
        orders_validation.*,
        valid_orders.registration_date AS valid_registration_date,
        orders_validation.order_id IS NOT NULL AND valid_orders.registration_date IS NULL AS status_not_found,
        IF(valid_orders.registration_date IS NOT NULL AND orders_validation.registration_date IS NOT NULL,
            DATEDIFF(valid_orders.registration_date, orders_validation.registration_date),
            NULL) AS delay
    FROM orders_validation
    LEFT JOIN valid_orders ON
        valid_orders.order_id = orders_validation.order_id
),

count_duplicates AS (
    SELECT
        *,
        MAX(registration_date_is_valid) OVER (PARTITION BY order_id) AS has_valid_counterpart
    FROM cross_check
)

SELECT
    registration_date_is_valid,
    status_not_found,
    COUNT(*) AS occurences,
    COUNT(DISTINCT order_id) AS distinct_orders,
    MIN(registration_date) AS min_registration_date,
    MAX(registration_date) AS max_registration_date,
    SUM(IF(2018_data, 1, 0)) AS 2018_data,
    MIN(delay) AS min_delay,
    MAX(delay) AS max_delay,
    SUM(IF(has_valid_counterpart, 1, 0)) AS valid_counterparts
FROM count_duplicates
GROUP BY 1,2
ORDER BY 1,2
"""

run_sql(query)

Unnamed: 0,registration_date_is_valid,status_not_found,occurences,distinct_orders,min_registration_date,max_registration_date,2018_data,min_delay,max_delay,valid_counterparts
0,False,False,1243179,1242368,2018-12-03,2019-01-31,1241959,-29.0,59.0,1241758
1,False,True,14,8,2018-12-10,2019-01-30,6,,,0
2,True,False,2440457,2440457,2019-01-01,2019-01-31,0,0.0,0.0,2440457


### Conclusions

Disregarding the order flow, at least 2440457 Orders seems to match with status registration

For some odd reason, there is a large number of valid orders with invalid counterparts, all or almost all of them placed on 2018.

Since it seems a forced corruption of data for test purposes, I will assume that all of the valid orders with valid flow should be considered trusted

### Looking into Status Flow

In [4]:
query = """
WITH order_timestamps AS (
    SELECT
        order_id,
        MIN(IF(value = 'REGISTERED', created_at, NULL)) AS registration_time,
        MIN(IF(value = 'PLACED', created_at, NULL)) AS place_time,
        MIN(IF(value = 'CONCLUDED', created_at, NULL)) AS conclusion_time,
        MIN(IF(value = 'CANCELLED', created_at, NULL)) AS cancelation_time
    FROM raw_layer.order_status
    GROUP BY order_id
),

valid_order_flows AS (
    SELECT
        order_id,
        registration_time IS NOT NULL
            AND (
                (
                place_time IS NOT NULL
                    AND registration_time <= place_time 
                    AND (
                        (cancelation_time IS NULL AND place_time <= conclusion_time) OR 
                        (conclusion_time IS NULL AND place_time <= cancelation_time)
                    ) 
                 )
                 OR (
                cancelation_time IS NOT NULL
                    AND registration_time <= cancelation_time 
                 )
            ) AS valid_flow
    FROM order_timestamps
)

SELECT
    valid_flow,
    COUNT(DISTINCT order_id)
FROM valid_order_flows
GROUP BY 1
        
"""
run_sql(query)

Unnamed: 0,valid_flow,count(DISTINCT order_id)
0,True,2407489
1,False,33578


### Conclusions

It seems almost all previous order expected to be valid, actually have a valid flow.

2407489 over 2440457

### Transformation Script

For the transformation, sensitive data must be considered:
- Consumer:
    - customer_name
    - customer_phone_number
- Order
    - cpf
    - customer_name
    
Show last status only
    
And restaurant local date partition is a nice to have. As seen on Order Study, order_created_at is at zulu time, aka utc

In [5]:
spark.sql("SET spark.sql.parser.quotedRegexColumnNames=true")

query = """
WITH order_timestamps AS (
    SELECT
        order_id,
        MIN(IF(value = 'REGISTERED', created_at, NULL)) AS registration_time,
        MIN(IF(value = 'PLACED', created_at, NULL)) AS place_time,
        MIN(IF(value = 'CONCLUDED', created_at, NULL)) AS conclusion_time,
        MIN(IF(value = 'CANCELLED', created_at, NULL)) AS cancelation_time
    FROM raw_layer.order_status
    GROUP BY order_id
),

valid_order_flows AS (
    SELECT
        order_id,
        registration_time IS NOT NULL
            AND (
                (
                place_time IS NOT NULL
                    AND registration_time <= place_time 
                    AND (
                        (cancelation_time IS NULL AND place_time <= conclusion_time) OR 
                        (conclusion_time IS NULL AND place_time <= cancelation_time)
                    ) 
                 )
                 OR (
                cancelation_time IS NOT NULL
                    AND registration_time <= cancelation_time 
                 )
            ) AS valid_flow
    FROM order_timestamps
),

valid_orders AS (
    SELECT
        order_id
    FROM valid_order_flows
    WHERE valid_flow
),

valid_order_final_status AS (
    SELECT
        order_id,
        COALESCE(CAST(CAST(MAX(registration_time) AS DATE) AS STRING), '') AS registration_date,
        MAX(status) AS status
    FROM (
        SELECT
            order_id,
            IF(value = 'REGISTERED', created_at, NULL) AS registration_time,
            LAST_VALUE(value) OVER (PARTITION BY order_id ORDER BY created_at ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS status
        FROM raw_layer.order_status
        WHERE order_id IN (SELECT order_id FROM valid_orders)
    )
    GROUP BY order_id    
),

order_registration AS (
    SELECT
        *,
        COALESCE(CAST(CAST(order_created_at AS DATE) AS STRING), '') AS registration_date
    FROM raw_layer.order
),

order_with_status AS (
    SELECT
        order.`(cpf|customer_name|registration_date)?+.+`,
        SHA2(cpf, 256) AS anon_cpf,
        SHA2(customer_name, 256) AS anon_order_customer_name,
        valid_order_final_status.status AS status
    FROM order_registration AS order
    LEFT JOIN valid_order_final_status ON
        valid_order_final_status.order_id = order.order_id AND
        order.registration_date = valid_order_final_status.registration_date 
    WHERE valid_order_final_status.order_id IS NOT NULL
),

anon_consumer AS (
    SELECT
        consumer.`(customer_name|customer_phone_number|language|created_at|active)?+.+`,
        consumer.language AS customer_language,
        consumer.created_at AS customer_created_at,
        consumer.active AS customer_active,
        SHA2(customer_phone_number, 256) AS anon_customer_phone_number,
        SHA2(customer_name, 256) AS anon_customer_name
    FROM raw_layer.consumer
),

restaurant AS (
    SELECT
        restaurant.`(created_at|enabled|price_range|average_ticket|takeout_time|delivery_time|minimum_order_value)?+.+`,
        restaurant.created_at AS restaurant_created_at,
        restaurant.enabled AS restaurant_enabled,
        restaurant.price_range AS restaurant_price_range,
        restaurant.average_ticket AS restaurant_average_ticket,
        restaurant.takeout_time AS restaurant_takeout_time,
        restaurant.delivery_time AS restaurant_delivery_time,
        restaurant.minimum_order_value AS restaurant_minimum_order_value
    FROM raw_layer.restaurant
),

order_extended AS (
    SELECT
        order_with_status.*,
        restaurant.`(id)?+.+`,
        anon_consumer.`(customer_id)?+.+`
    FROM order_with_status
    LEFT JOIN restaurant ON
        restaurant.id = order_with_status.merchant_id
    LEFT JOIN anon_consumer ON
        anon_consumer.customer_id = order_with_status.customer_id
),

order_localized AS (
    SELECT
        *,
        FROM_UTC_TIMESTAMP(CAST(order_created_at AS STRING), merchant_timezone) AS local_created_at
    FROM order_extended
)

SELECT
    *
FROM order_localized
ORDER BY order_id
        
"""
transformed_data = spark.sql(query)
print("Orders Count: {0}".format(transformed_data.count()))
transformed_data.show(1, truncate=False, vertical=True)

Orders Count: 2407316
-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 customer_id                    | ca58cf49-cde3-46ee-8192-989e8e48a2f8                                                                                                                                                                                                                                                                  
 delivery_address_city          | GOIANIA                                                                                                                                                                                                                                                                                       

### Conclusions
From the 3683650 occurences of 2441075‬ distinct orders, only 2407316 are actually valid orders