# The Objective
This exercise revolves around translating a given SQL query into a Python script. The query in question is the ams.rules_input.sql query. 

**The objective is to generate an exact copy, in a pandas DataFrame, of data_engineer.ams_cms_input_precursor, which is a precursor to the ams.cms_input table in Python.**

## Context

### What is the Business Value of this Toy Query?
The ams.cms_input.sql query aggregates location and status data on our devices. This is required because we have different types of devices that run on different systems - our having multiple systems was an outgrowth of acquisitions (we bought a company called AccentHealth) and the organic creation of new devices (like our WiFi product, which runs on the "airtight" system). "CMS" stands for ContextMedia Systems, which is how we refer to our systems in their entirety.

### Important Notes

- **Our objective here is to reproduce the data being inserted into AMS.CMS_INPUT_HISTORY in the second statement below.**

- **The source code is production code - for this exercise, subsets of the tables have been created within the data_engineer schema in the interview database.** 

### The Query Code
The code you will be "translating" from SQL to Python is below. You may want to paste it somewhere else for easy reading, since the query is fairly long.

```sql
WITH AMS_AIRTIGHT_SOURCE_SYSTEM_NAMES AS (
    SELECT DISTINCT A.SOURCE,
                    'at'|| DENSE_RANK() OVER (ORDER BY SOURCE ASC) AS SOURCE_SYSTEM_NAME
    FROM data_engineer.airtight_sensors_history A
    WHERE A.EXPORT_DATE = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')
)

SELECT DISTINCT COALESCE(c.asset_tag, d.asset_tag, a.asset_tag) AS asset_tag,
       a.source_system,
       a.source_system_id,
       a.source_system_asset_name,
       COALESCE(c.product, d.product, e.product, f.product, g.product, a.product) AS product,
       CASE WHEN a.product_use IN ('Infusion Room Tablet','Rheumatology IRT') AND COALESCE(c.product, d.product, e.product, f.product, g.product, a.product) = 'Tablet' THEN a.product_use ELSE COALESCE(c.product, d.product, e.product, f.product, g.product, a.product) END AS product_use,
       a.cmh_id,
       a.device_location_status,
       a.last_pinged_at,
       COALESCE(c.wifi_mac_address,d.wifi_mac_address,d.lan_mac_address,e.wifi_mac_address,e.lan_mac_address,a.mac_address) AS mac_address,
       a.ssid,
       a.public_ip_address,
       COALESCE(c.sku, d.sku, e.sku, f.sku, g.sku, a.sku) AS sku,
       a.created_at,
       a.installed_date,
       a.deleted_at,
       a.reason_for_deletion,
       a.export_date
FROM (
    SELECT
            TRIM(UPPER(CASE WHEN d.asset_tag IS NOT NULL AND a.asset_id ~ '^A[0-9]+$' THEN a.asset_id --If the airtight name is good then use it
                    ELSE COALESCE(d.asset_tag,a.asset_id) --Else use the salesforce/airtight name. If not airtight use CMS name
                   END)) AS asset_tag
        ,   CASE WHEN cms_column = 'client_id' THEN 'mdm'
                 WHEN cms_column = 'radio_macaddress' THEN d.source_system_name
                 WHEN cms_column = 'display_unit_id' THEN 'broadsign'
            END AS source_system
        ,   CASE WHEN cms_column = 'client_id' THEN b.device_id
                 WHEN cms_column = 'radio_macaddress' THEN d.boxid
                 WHEN cms_column = 'display_unit_id' THEN c.host_id
            END AS source_system_id
        ,   a.asset_name AS source_system_asset_name
        ,   CASE  WHEN a.type IN ('Infusion Room Tablet','Rheumatology IRT') THEN 'Tablet'
                  ELSE a.type
            END AS product
        ,   a.type AS product_use
        ,   a.cmh_id
        ,   CASE WHEN a.status = 'Active' THEN 'Installed'
                 WHEN b.deleted_at IS NOT NULL THEN 'Deleted'
                 WHEN b.status = 'Demo' THEN 'Demo'
                 ELSE NULL::varchar
            END AS device_location_status
        ,   a.last_pinged_at
        ,   REPLACE(TRIM(UPPER(COALESCE(COALESCE(b.mac_address, b.missing_mac_address),c.mac_address,d.radio_macaddress))),':','') AS mac_address
        ,   b.ssid
        ,   c.public_ip AS public_ip_address
        ,   COALESCE(b.sku,c.sku,d.sku) AS sku
        ,   b.created_at
        ,   b.installed_date
        ,   b.deleted_at
        ,   CASE WHEN b.deleted_at IS NOT NULL THEN 'Deleted by membership' END::varchar(250) AS reason_for_deletion
        ,   a.export_date

    FROM data_engineer.shared_assets_history a

    LEFT JOIN (
                SELECT
                        a.*
                    ,   COALESCE(c.sku, b.sku) AS sku
                    ,   d.mac_address as missing_mac_address

                FROM data_engineer.mdm_devices_history a

                LEFT JOIN
                (
                 SELECT DISTINCT UPPER(a.asset_tag) AS ASSET_TAG,
                                 FIRST_VALUE(a.sku) OVER(PARTITION BY a.asset_tag ORDER BY a.created_date DESC ROWS UNBOUNDED PRECEDING) as sku
                   FROM data_engineer.ams_sf_assets_input a
                ) b ON UPPER(a.asset_id) = b.asset_tag

                LEFT JOIN data_engineer.shared_mdm_custom_rom_sku_mapping c
                ON a.custom_rom_version = c.custom_rom_version

                LEFT JOIN data_engineer.mdm_missing_mac_addresses d    -- Some duplicate records for assets in MDM with one of which not having a MAC address, make sure that gets assigned so can be flagged as dupe
                ON a.asset_id = d.asset_id

                WHERE a.export_date = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')
              ) b
         ON a.cms_id = b.client_id
        AND a.export_date = b.export_date
        AND a.cms_column = 'client_id'

    LEFT JOIN (
                SELECT
                        a.name
                    ,   a.display_unit_id
                    ,   b.poll_last_utc AS last_pinged_at
                    ,   a.host_id
                    ,   COALESCE(a.primary_mac_address,a.secondary_mac_address) AS mac_address
                    ,   b.public_ip
                    ,   COALESCE(NULLIF(c.sku,'Player'), CASE WHEN a.secondary_mac_address IS NOT NULL THEN 'P-PLA-102-NWA-01' ELSE 'P-PLA-101-NWA-01' END) AS sku --At the time of coding this was the only way to ascertain SKU from broadsign
                    ,   a.export_date

                FROM data_engineer.broadsign_hosts_history a

                LEFT JOIN data_engineer.broadsign_monitor_polls_history b
                ON a.host_id = b.client_resource_id
                AND a.export_date = b.export_date

                LEFT JOIN
                (
                 SELECT DISTINCT TRIM(UPPER(REPLACE(mac_address,':',''))) AS mac_address,
                                 FIRST_VALUE(a.sku) OVER(PARTITION BY mac_address ORDER BY created_date DESC ROWS UNBOUNDED PRECEDING) as sku
                   FROM data_engineer.ams_sf_assets_input a
                ) c ON TRIM(UPPER(REPLACE(COALESCE(a.primary_mac_address,a.secondary_mac_address),':',''))) = c.mac_address

                LEFT JOIN data_engineer.ams_broadsign_migration_devices_history d
                ON a.host_id = cast(d.bs_host_id as bigint)
                AND a.export_date = d.export_date

                WHERE d.bs_host_id IS NULL
                  AND a.export_date = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')
             ) c --It was a mistake to make the cms_id for broadsign display_unit_it but now I need to live with it. Therefore the below complex join is required. Once we move to ods.devices most of these joins after the FROM will not be necessary
        ON a.asset_name = c.name
        AND a.cms_id = cast(c.display_unit_id as varchar)
        AND a.export_date = c.export_date
        AND COALESCE(a.last_pinged_at,'1900-01-01') = COALESCE(c.last_pinged_at,'1900-01-01')
        AND a.cms_column = 'display_unit_id'

    LEFT JOIN (
              SELECT DISTINCT
                        FIRST_VALUE(a.boxid) OVER (PARTITION BY a.radio_macaddress, a.export_date ORDER BY a.export_hour DESC, a.radio_upsince DESC ROWS UNBOUNDED PRECEDING) AS boxid
                    ,   FIRST_VALUE(b.source_system_name) OVER (PARTITION BY a.radio_macaddress, a.export_date ORDER BY a.export_hour DESC, a.radio_upsince DESC  ROWS UNBOUNDED PRECEDING) AS source_system_name
                    ,   a.export_date
                    ,   FIRST_VALUE(a.radio_macaddress) OVER (PARTITION BY a.radio_macaddress, a.export_date ORDER BY a.export_hour DESC, a.radio_upsince DESC  ROWS UNBOUNDED PRECEDING) AS radio_macaddress
                    ,   FIRST_VALUE(c.asset_tag) OVER (PARTITION BY a.radio_macaddress, a.export_date ORDER BY a.export_hour DESC, a.radio_upsince DESC  ROWS UNBOUNDED PRECEDING) AS asset_tag
                    ,   FIRST_VALUE(COALESCE(c.sku,'P-WFI-101-MOJ-01')) OVER (PARTITION BY a.radio_macaddress, a.export_date ORDER BY a.export_hour DESC, a.radio_upsince DESC  ROWS UNBOUNDED PRECEDING) AS sku

                FROM data_engineer.airtight_sensors_history a

                INNER JOIN ams_airtight_source_system_names b
                ON a.source = b.source

                LEFT JOIN
                (
                    SELECT DISTINCT TRIM(UPPER(REPLACE(COALESCE(serial_number, mac_address),':',''))) AS mac_address,
                                    FIRST_VALUE(a.asset_tag) OVER(PARTITION BY TRIM(UPPER(REPLACE(COALESCE(serial_number, mac_address),':',''))) ORDER BY created_date DESC ROWS UNBOUNDED PRECEDING) as asset_tag,
                                    FIRST_VALUE(a.sku) OVER(PARTITION BY TRIM(UPPER(REPLACE(COALESCE(serial_number, mac_address),':',''))) ORDER BY created_date DESC ROWS UNBOUNDED PRECEDING) as sku
                      FROM data_engineer.ams_sf_assets_input a
                ) c
                ON TRIM(UPPER(REPLACE(a.radio_macaddress,':',''))) = c.mac_address

                WHERE a.export_date = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')
                ) d
    ON a.cms_id = d.radio_macaddress
    AND a.export_date = d.export_date
    AND a.cms_column = 'radio_macaddress'

    WHERE a.export_date = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')
    ) a

LEFT JOIN data_engineer.ams_cms_input_history b --Ensure this is unique by source_system, source_system_id, export_date
ON a.source_system = b.source_system
AND a.source_system_id = b.source_system_id
AND a.export_date = b.export_date

LEFT JOIN data_engineer.ams_bat_input c
ON a.mac_address = c.wifi_mac_address

LEFT JOIN data_engineer.ams_bat_input d
ON a.mac_address = d.lan_mac_address

LEFT JOIN data_engineer.ams_bat_input e
ON a.asset_tag = e.asset_tag
AND a.product = e.product
AND a.mac_address IS NULL

LEFT JOIN data_engineer.shared_sku_master f
ON a.sku = f.sku

LEFT JOIN data_engineer.shared_sku_master g
ON a.sku = g.old_sku
AND g.rollup_sku_to_use

WHERE b.source_system_id IS NULL
  and a.source_system_asset_name NOT IN ('\nT17105') -- custom addition for the exercise
```

### Input Tables
#### Airtight Tables: WiFi Router Product Tables
- **data_engineer.airtight_sensors_history:** A table containing device location and status data on our WiFi router product. Provides hourly snapshots.

#### AMS Tables: Pre-Created Tables Used Specifically for AMS
- **data_engineer.ams_bat_input:** - A “seed” table containing a list of devices and device information. Stands for “Big Ass Table”.
- **data_engineer.ams_broadsign_migration_devices_history:** Currently an empty table awaiting data for a migration - we’re converting all our Waiting Room TV products to Broadsign (a small subsegment of them were not on Broadsign previously).
- **data_engineer.ams_cms_input_history:** A table containing a historical record of daily snapshots of ams.cms_input.
- **data_engineer.ams_rules_input:** A table that contains various business parameters business users can change. In the absence of named variables (and because AMS was built from an analyst's point of view, where SQL and Redshift were the only infrastructure possibilities), this is how parameters are created for AMS.
- **data_engineer.ams_sf_assets_input:** A generated input table that contains any device information contained in Salesforce. This is primarily for obtaining data on our non-interactive devices, like our TVs.

#### Broadsign Tables: Waiting Room TV Product Tables
- **data_engineer.broadsign_hosts_history:** Device information on our Waiting Room TV products (the majority of which run on Broadsign - a content delivery platform). 
- **data_engineer.broadsign_monitor_polls_history:** Ping information on our Waiting Room TV products.

#### Mobile Device Management (MDM) Tables: Tables for Device Data on All Other Devices (Tablets and Wallboards)
- **data_engineer.mdm_devices_history:** A historical record of information on our devices - MAC address, sku, hardware info, etc.
- **data_engineer.mdm_missing_mac_addresses:** A custom table created to identify the MAC address for devices were missing them.

#### Shared Tables: Pre-Created Tables in Redshift Used for Multiple Processes, Including AMS
- **data_engineer.shared_assets_history:** A legacy table of old asset, product, and status information. We primarily use this as a “seed table” for AMS, which generates a newer version of asset, location, and status information.
- **data_engineer.shared_mdm_custom_rom_sku_mapping:** A custom mapping table that helps us determine skus appropriately.
- **data_engineer.shared_sku_master:** - Another “seed” table containing manually entered information on sku matching (this was needed to match old skus to newer ones).

### Output Tables
- **data_engineer.ams_cms_input_precursor:** A precursor to ams.cms_input, which is a cleaned up, aggregated table containing location and status information on our devices - meant for AMS consumption. For our intents and purposes, this is what gets stored in the ams.cms_input_history table as a snapshot.

## Pulling the Data
### Accessing the Input Data
For speed and optimization reasons, i'm only pulling the required data. For doing so, i went through the SQL query and identified which columns of which tables were needed, and also if there were any "WHERE" conditions which would help me reduce the final amount of data that was being downloaded.

In [1]:
import pandas as pd
import pandas.io.sql as sqlio
import psycopg2

pd.options.display.max_colwidth = 200 


conn = psycopg2.connect(host="data-interview.outcomehealth.io",
                        port="5432",
                        database="product_analytics", 
                        user="pa_candidate", 
                        password="OsOntUnDleYeTivi")

# Getting all the required data into pandas dataframes
query_airtight_sensors_history="SELECT boxid,radio_macaddress,export_date,export_hour,radio_upsince,source FROM data_engineer.airtight_sensors_history WHERE EXPORT_DATE = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')"
airtight_sensors_history = sqlio.read_sql_query(query_airtight_sensors_history, conn)

query_ams_rules_input='SELECT value,rule FROM data_engineer.ams_rules_input'
ams_rules_input = sqlio.read_sql_query(query_ams_rules_input,conn)

query_ams_bat_input='SELECT wifi_mac_address,lan_mac_address,asset_tag,product,sku FROM data_engineer.ams_bat_input'
ams_bat_input = sqlio.read_sql_query(query_ams_bat_input,conn)

query_shared_sku_master='SELECT sku,old_sku,rollup_sku_to_use,product FROM data_engineer.shared_sku_master' 
shared_sku_master = sqlio.read_sql_query(query_shared_sku_master,conn)

query_shared_assets_history="SELECT asset_id,asset_name,type,status,cmh_id,last_pinged_at,export_date,cms_id,cms_column FROM data_engineer.shared_assets_history WHERE export_date = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date') AND asset_name NOT IN ('\nT17105')"
shared_assets_history = sqlio.read_sql_query(query_shared_assets_history,conn)

query_mdm_devices_history="SELECT id,asset_id,custom_rom_version,device_id,client_id,status,mac_address,ssid,created_at,installed_date,deleted_at,export_date FROM data_engineer.mdm_devices_history WHERE export_date = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')"
mdm_devices_history = sqlio.read_sql_query(query_mdm_devices_history,conn)

query_ams_sf_assets_input='SELECT asset_tag,sku,created_date,serial_number,mac_address FROM data_engineer.ams_sf_assets_input'
ams_sf_assets_input = sqlio.read_sql_query(query_ams_sf_assets_input,conn)

query_shared_mdm_custom_rom_sku_mapping='SELECT custom_rom_version,sku FROM data_engineer.shared_mdm_custom_rom_sku_mapping'
shared_mdm_custom_rom_sku_mapping = sqlio.read_sql_query(query_shared_mdm_custom_rom_sku_mapping,conn)

query_mdm_missing_mac_addresses='SELECT asset_id,mac_address FROM data_engineer.mdm_missing_mac_addresses'
mdm_missing_mac_addresses = sqlio.read_sql_query(query_mdm_missing_mac_addresses,conn)

query_broadsign_hosts_history="SELECT name,display_unit_id,host_id,primary_mac_address,secondary_mac_address,export_date FROM data_engineer.broadsign_hosts_history WHERE export_date = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')"
broadsign_hosts_history = sqlio.read_sql_query(query_broadsign_hosts_history,conn)

query_broadsign_monitor_polls_history='SELECT poll_last_utc,public_ip,client_resource_id,export_date FROM data_engineer.broadsign_monitor_polls_history'
broadsign_monitor_polls_history = sqlio.read_sql_query(query_broadsign_monitor_polls_history,conn)

query_ams_broadsign_migration_devices_history='SELECT bs_host_id,export_date FROM data_engineer.ams_broadsign_migration_devices_history WHERE bs_host_id IS NULL'
ams_broadsign_migration_devices_history = sqlio.read_sql_query(query_ams_broadsign_migration_devices_history,conn)

query_ams_cms_input_history='SELECT source_system,source_system_id,export_date FROM data_engineer.ams_cms_input_history WHERE source_system_id IS NULL'
ams_cms_input_history = sqlio.read_sql_query(query_ams_cms_input_history,conn)


### Accessing the Output Data
For accessing the output dataset, i just run the whole query replacing the **ams_airtight_source_system_names** table into the query for it's definition (inner join line 132) .

In [2]:
sql = """SELECT DISTINCT COALESCE(c.asset_tag, d.asset_tag, a.asset_tag) AS asset_tag,
       a.source_system,
       a.source_system_id,
       a.source_system_asset_name,
       COALESCE(c.product, d.product, e.product, f.product, g.product, a.product) AS product,
       CASE WHEN a.product_use IN ('Infusion Room Tablet','Rheumatology IRT') AND COALESCE(c.product, d.product, e.product, f.product, g.product, a.product) = 'Tablet' THEN a.product_use ELSE COALESCE(c.product, d.product, e.product, f.product, g.product, a.product) END AS product_use,
       a.cmh_id,
       a.device_location_status,
       a.last_pinged_at,
       COALESCE(c.wifi_mac_address,d.wifi_mac_address,d.lan_mac_address,e.wifi_mac_address,e.lan_mac_address,a.mac_address) AS mac_address,
       a.ssid,
       a.public_ip_address,
       COALESCE(c.sku, d.sku, e.sku, f.sku, g.sku, a.sku) AS sku,
       a.created_at,
       a.installed_date,
       a.deleted_at,
       a.reason_for_deletion,
       a.export_date
FROM (
    SELECT
            TRIM(UPPER(CASE WHEN d.asset_tag IS NOT NULL AND a.asset_id ~ '^A[0-9]+$' THEN a.asset_id --If the airtight name is good then use it
                    ELSE COALESCE(d.asset_tag,a.asset_id) --Else use the salesforce/airtight name. If not airtight use CMS name
                   END)) AS asset_tag
        ,   CASE WHEN cms_column = 'client_id' THEN 'mdm'
                 WHEN cms_column = 'radio_macaddress' THEN d.source_system_name
                 WHEN cms_column = 'display_unit_id' THEN 'broadsign'
            END AS source_system
        ,   CASE WHEN cms_column = 'client_id' THEN b.device_id
                 WHEN cms_column = 'radio_macaddress' THEN d.boxid
                 WHEN cms_column = 'display_unit_id' THEN c.host_id
            END AS source_system_id
        ,   a.asset_name AS source_system_asset_name
        ,   CASE  WHEN a.type IN ('Infusion Room Tablet','Rheumatology IRT') THEN 'Tablet'
                  ELSE a.type
            END AS product
        ,   a.type AS product_use
        ,   a.cmh_id
        ,   CASE WHEN a.status = 'Active' THEN 'Installed'
                 WHEN b.deleted_at IS NOT NULL THEN 'Deleted'
                 WHEN b.status = 'Demo' THEN 'Demo'
                 ELSE NULL::varchar
            END AS device_location_status
        ,   a.last_pinged_at
        ,   REPLACE(TRIM(UPPER(COALESCE(COALESCE(b.mac_address, b.missing_mac_address),c.mac_address,d.radio_macaddress))),':','') AS mac_address
        ,   b.ssid
        ,   c.public_ip AS public_ip_address
        ,   COALESCE(b.sku,c.sku,d.sku) AS sku
        ,   b.created_at
        ,   b.installed_date
        ,   b.deleted_at
        ,   CASE WHEN b.deleted_at IS NOT NULL THEN 'Deleted by membership' END::varchar(250) AS reason_for_deletion
        ,   a.export_date

    FROM data_engineer.shared_assets_history a

        LEFT JOIN (
                    SELECT
                            a.*
                        ,   COALESCE(c.sku, b.sku) AS sku
                        ,   d.mac_address as missing_mac_address

                    FROM data_engineer.mdm_devices_history a

                        LEFT JOIN (
                          
                            SELECT DISTINCT UPPER(a.asset_tag) AS ASSET_TAG,
                                            FIRST_VALUE(a.sku) OVER(PARTITION BY a.asset_tag ORDER BY a.created_date DESC ROWS UNBOUNDED PRECEDING) as sku
                            FROM data_engineer.ams_sf_assets_input a
                        ) b 
                        ON UPPER(a.asset_id) = b.asset_tag

                        LEFT JOIN data_engineer.shared_mdm_custom_rom_sku_mapping c
                        ON a.custom_rom_version = c.custom_rom_version

                        LEFT JOIN data_engineer.mdm_missing_mac_addresses d    -- Some duplicate records for assets in MDM with one of which not having a MAC address, make sure that gets assigned so can be flagged as dupe
                        ON a.asset_id = d.asset_id

                    WHERE a.export_date = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')
                  ) b
            ON a.cms_id = b.client_id
            AND a.export_date = b.export_date
            AND a.cms_column = 'client_id'

        LEFT JOIN (
                    SELECT
                            a.name
                        ,   a.display_unit_id
                        ,   b.poll_last_utc AS last_pinged_at
                        ,   a.host_id
                        ,   COALESCE(a.primary_mac_address,a.secondary_mac_address) AS mac_address
                        ,   b.public_ip
                        ,   COALESCE(NULLIF(c.sku,'Player'), CASE WHEN a.secondary_mac_address IS NOT NULL THEN 'P-PLA-102-NWA-01' ELSE 'P-PLA-101-NWA-01' END) AS sku --At the time of coding this was the only way to ascertain SKU from broadsign
                        ,   a.export_date

                    FROM data_engineer.broadsign_hosts_history a

                        LEFT JOIN data_engineer.broadsign_monitor_polls_history b
                        ON a.host_id = b.client_resource_id
                        AND a.export_date = b.export_date

                        LEFT JOIN
                        (
                         SELECT DISTINCT TRIM(UPPER(REPLACE(mac_address,':',''))) AS mac_address,
                                         FIRST_VALUE(a.sku) OVER(PARTITION BY mac_address ORDER BY created_date DESC ROWS UNBOUNDED PRECEDING) as sku
                           FROM data_engineer.ams_sf_assets_input a
                        ) c 
                        ON TRIM(UPPER(REPLACE(COALESCE(a.primary_mac_address,a.secondary_mac_address),':',''))) = c.mac_address

                        LEFT JOIN data_engineer.ams_broadsign_migration_devices_history d
                        ON a.host_id = cast(d.bs_host_id as bigint)
                        AND a.export_date = d.export_date

                    WHERE d.bs_host_id IS NULL AND a.export_date = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')
                 ) c --It was a mistake to make the cms_id for broadsign display_unit_it but now I need to live with it. Therefore the below complex join is required. Once we move to ods.devices most of these joins after the FROM will not be necessary
            ON a.asset_name = c.name
            AND a.cms_id = cast(c.display_unit_id as varchar)
            AND a.export_date = c.export_date
            AND COALESCE(a.last_pinged_at,'1900-01-01') = COALESCE(c.last_pinged_at,'1900-01-01')
            AND a.cms_column = 'display_unit_id'

        LEFT JOIN (
                SELECT DISTINCT
                            FIRST_VALUE(a.boxid) OVER (PARTITION BY a.radio_macaddress, a.export_date ORDER BY a.export_hour DESC, a.radio_upsince DESC ROWS UNBOUNDED PRECEDING) AS boxid
                        ,   FIRST_VALUE(b.source_system_name) OVER (PARTITION BY a.radio_macaddress, a.export_date ORDER BY a.export_hour DESC, a.radio_upsince DESC  ROWS UNBOUNDED PRECEDING) AS source_system_name
                        ,   a.export_date
                        ,   FIRST_VALUE(a.radio_macaddress) OVER (PARTITION BY a.radio_macaddress, a.export_date ORDER BY a.export_hour DESC, a.radio_upsince DESC  ROWS UNBOUNDED PRECEDING) AS radio_macaddress
                        ,   FIRST_VALUE(c.asset_tag) OVER (PARTITION BY a.radio_macaddress, a.export_date ORDER BY a.export_hour DESC, a.radio_upsince DESC  ROWS UNBOUNDED PRECEDING) AS asset_tag
                        ,   FIRST_VALUE(COALESCE(c.sku,'P-WFI-101-MOJ-01')) OVER (PARTITION BY a.radio_macaddress, a.export_date ORDER BY a.export_hour DESC, a.radio_upsince DESC  ROWS UNBOUNDED PRECEDING) AS sku

                FROM data_engineer.airtight_sensors_history a

                    INNER JOIN(
                    			SELECT DISTINCT A.SOURCE,
                    							'at'|| DENSE_RANK() OVER (ORDER BY SOURCE ASC) AS SOURCE_SYSTEM_NAME
    							FROM data_engineer.airtight_sensors_history A
    							WHERE A.EXPORT_DATE = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')) b
                    ON a.source = b.source

                    LEFT JOIN(
                          SELECT DISTINCT TRIM(UPPER(REPLACE(COALESCE(serial_number, mac_address),':',''))) AS mac_address,
                                        FIRST_VALUE(a.asset_tag) OVER(PARTITION BY TRIM(UPPER(REPLACE(COALESCE(serial_number, mac_address),':',''))) ORDER BY created_date DESC ROWS UNBOUNDED PRECEDING) as asset_tag,
                                        FIRST_VALUE(a.sku) OVER(PARTITION BY TRIM(UPPER(REPLACE(COALESCE(serial_number, mac_address),':',''))) ORDER BY created_date DESC ROWS UNBOUNDED PRECEDING) as sku
                          FROM data_engineer.ams_sf_assets_input a
                            ) c
                    ON TRIM(UPPER(REPLACE(a.radio_macaddress,':',''))) = c.mac_address

                WHERE a.export_date = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')

                   ) d 
            ON a.cms_id = d.radio_macaddress
            AND a.export_date = d.export_date
            AND a.cms_column = 'radio_macaddress'

    WHERE a.export_date = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')
    ) a

    LEFT JOIN data_engineer.ams_cms_input_history b --Ensure this is unique by source_system, source_system_id, export_date
    ON a.source_system = b.source_system
    AND a.source_system_id = b.source_system_id
    AND a.export_date = b.export_date

    LEFT JOIN data_engineer.ams_bat_input c
    ON a.mac_address = c.wifi_mac_address

    LEFT JOIN data_engineer.ams_bat_input d
    ON a.mac_address = d.lan_mac_address

    LEFT JOIN data_engineer.ams_bat_input e
    ON a.asset_tag = e.asset_tag
    AND a.product = e.product
    AND a.mac_address IS NULL

    LEFT JOIN data_engineer.shared_sku_master f
    ON a.sku = f.sku

    LEFT JOIN data_engineer.shared_sku_master g
    ON a.sku = g.old_sku
    AND g.rollup_sku_to_use

WHERE b.source_system_id IS NULL and a.source_system_asset_name NOT IN ('\nT17105')"""
output_data = sqlio.read_sql_query(sql, conn)
output_data.head()

Unnamed: 0,asset_tag,source_system,source_system_id,source_system_asset_name,product,product_use,cmh_id,device_location_status,last_pinged_at,mac_address,ssid,public_ip_address,sku,created_at,installed_date,deleted_at,reason_for_deletion,export_date
0,505,broadsign,,0505 - Aopen Retired 10/28/13,Waiting Room Screen,Waiting Room Screen,,,2013-05-01 16:43:38,,,,,NaT,,NaT,,2019-03-31
1,516,broadsign,,0516-1 (WIll be MIA often) - lost/stolen,Waiting Room Screen,Waiting Room Screen,,,2008-10-06 13:20:37,,,,,NaT,,NaT,,2019-03-31
2,10005,broadsign,,0501-1_10005,Waiting Room Screen,Waiting Room Screen,501.0,,2008-06-26 19:44:16,00018068E25D,,,P-PLA-101-NWA-01,NaT,,NaT,,2019-03-31
3,10016,broadsign,,5056-2_10016 - MS /66856/ - Returning 03/18/2016,Waiting Room Screen,Waiting Room Screen,5056.0,,2015-11-20 02:30:47,,,,,NaT,,NaT,,2019-03-31
4,10021,broadsign,,30180-1_10021 - Combo,Waiting Room Screen,Waiting Room Screen,30180.0,,2015-04-17 15:00:22,,,,,NaT,,NaT,,2019-03-31


## Processing the Data
**Breaking the query down into smaller pieces**

The first thing i do is to create the tables that we are going to use later on to do the left joins needed to fill in the final table. These tables are called b,c and d, that together they conform a.

First things first, getting b:


In [None]:
b= """SELECT
                            a.*
                        ,   COALESCE(c.sku, b.sku) AS sku
                        ,   d.mac_address as missing_mac_address

                    FROM data_engineer.mdm_devices_history a

                        LEFT JOIN (
                          
                            SELECT DISTINCT UPPER(a.asset_tag) AS ASSET_TAG,
                                            FIRST_VALUE(a.sku) OVER(PARTITION BY a.asset_tag ORDER BY a.created_date DESC ROWS UNBOUNDED PRECEDING) as sku
                            FROM data_engineer.ams_sf_assets_input a
                        ) b 
                        ON UPPER(a.asset_id) = b.asset_tag

                        LEFT JOIN data_engineer.shared_mdm_custom_rom_sku_mapping c
                        ON a.custom_rom_version = c.custom_rom_version

                        LEFT JOIN data_engineer.mdm_missing_mac_addresses d    -- Some duplicate records for assets in MDM with one of which not having a MAC address, make sure that gets assigned so can be flagged as dupe
                        ON a.asset_id = d.asset_id

                    WHERE a.export_date = (SELECT value::date FROM data_engineer.ams_rules_input WHERE rule = 'Rollforward date')"""

In [3]:
#
ams_sf_assets_input.sort_values(['created_date'],ascending=False,inplace=True)
b=ams_sf_assets_input.groupby('asset_tag')[['sku']].first().reset_index()
b['asset_tag']=b['asset_tag'].str.upper()


mdm_devices_history['asset_id'] = mdm_devices_history['asset_id'].str.upper()
s1 = pd.merge(mdm_devices_history, b, how='left', left_on=['asset_id'],right_on=['asset_tag'],suffixes=('_a','_b'))
s2 = pd.merge(s1, shared_mdm_custom_rom_sku_mapping, how='left', left_on=a['custom_rom_version'],right_on=['custom_rom_version'],suffixes=('','_c'))
s3 = pd.merge(s2, d, how='left', left_on=a['asset_id'],right_on=['asset_id'],suffixes=('','_d'))

b=pd.concat([s3[list(mdm_devices_history.columns)],s3.sku.combine_first(s3.sku_c)],axis=1)
b['missing_mac_address']=s3['mac_address_d']

## Output Validation
For validating that every value in the created table is identical to the output table.

## Unit Test Implementation
We ask that you implement at least one unit test to catch errors with your script.