In [1]:
import pandas as pd 
from dotenv import load_dotenv
import snowflake.connector
import os
import jupyter_black

jupyter_black.load()

In [2]:
# loads .env file and stores as environmental variables
# this is not added to version control, passwords should not be stored in github
load_dotenv()

True

In [3]:
# class to connect to snowflake and run queries
class Snowflake_Query:
    def __init__(self):
        self.user = os.getenv("snowflake_user")
        self.password = os.getenv("snowflake_password")
        self.account = os.getenv("snowflake_account")
        self.database = os.getenv("snowflake_database")
        self.warehouse = os.getenv("snowflake_warehouse")
        self.con = None

        if not all(
            [self.user, self.password, self.account, self.database, self.warehouse]
        ):
            raise ValueError("Missing environment variables for Snowflake connection")

    def connect(self):
        try:
            self.con = snowflake.connector.connect(
                user=self.user,
                password=self.password,
                account=self.account,
                database=self.database,
                warehouse=self.warehouse,
            )
        except Exception as e:
            raise ConnectionError(f"Failed to connect to Snowflake: {e}")

    def run_query(self, query):
        if not self.con:
            raise ConnectionError("Connection to Snowflake is not established")

        try:
            cs = self.con.cursor()
            cs.execute(query)
            df = cs.fetch_pandas_all()
        except Exception as e:
            raise RuntimeError(f"Failed to execute query: {e}")
        finally:
            cs.close()

        return df

    def close_con(self):
        if self.con:
            self.con.close()

In [4]:
promt_1_query = """
WITH CTE AS (
    SELECT
        PROPLOC AS "Property Location",
        RP1NBRCDE AS "Assessor Neighborhood Code",
        NC."district" AS "Assessor Neighborhood District",
        NC."neighborhood" AS "Assessor Neighborhood", 
        REPLACE(SPLIT(RP1PRCLID, ' ')[0], '"', '') AS "Block",
        REPLACE(SPLIT(RP1PRCLID, ' ')[1], '"', '') AS "Lot",
        CONCAT(SPLIT(RP1PRCLID, ' ')[0], SPLIT(RP1PRCLID, ' ')[1] ) as "Parcel Number",
        RP1VOLUME AS "Volume Number",
        RP1CLACDE AS "Property Class Code",
        PC."class_definition",
        PC."use_code",
        PC."use_definition",
        YRBLT AS "Year Property Built",
        BATHS AS "Number of Bathrooms",
        BEDS AS "Number of Bedrooms",
        ROOMS AS "Number of Rooms",
        STOREYNO AS "Number of Stories",
        UNITS AS "Number of Units",
        ZONE AS "Zoning Code",
        CONSTTYPE AS "Construction Type",
        DEPTH AS "Lot Depth",
        FRONT AS "Lot Frontage",
        SQFT AS "Property Area",
        FBA AS "Basement Area",
        LAREA AS "Lot Area",
        LOTCODE AS "Lot Code",
        REPRISDATE AS "Prior Sales Date",
        RP1TRACDE AS "Tax Rate Area Code",
        OWNRPRCNT AS "Percent of Ownership",
        EXEMPTYPE AS "Exemption Code",
        EC."exemption_definition",
        RP1STACDE AS "Status Code",
        RP1EXMVL2 AS "Misc Exemption Value",
        RP1EXMVL1 AS "Homeowner Exemption Value",
        ROLLYEAR AS "Closed Roll Year",
        RECURRSALD AS "Current Sales Date",
        RP1FXTVAL AS "Assessed Fixtures Value",
        RP1IMPVAL AS "Assessed Improvement Value",
        RP1LNDVAL AS "Assessed Land Value",
        RP1PPTVAL AS "Assessed Personal Property Value"
    FROM(
    SELECT *
    FROM TAKE_HOME_USER2.ASR.ASR_2019

    UNION ALL

    SELECT *
    FROM TAKE_HOME_USER2.ASR.ASR_2018

    UNION ALL

    SELECT *
    FROM TAKE_HOME_USER2.ASR.ASR_2017) as asr
    LEFT JOIN TAKE_HOME_USER2.ASR.EXEMPTION_CODES EC ON asr.EXEMPTYPE = EC."exemption_code"
    LEFT JOIN TAKE_HOME_USER2.ASR.NEIGHBORHOOD_CODES NC ON asr.RP1NBRCDE = NC."code"
    LEFT JOIN TAKE_HOME_USER2.ASR.PROPERTY_CODES PC ON asr.RP1CLACDE = PC."class_code"

)
SELECT
    "Closed Roll Year",
    "Property Location",
    "Parcel Number",
    "Block",
    "Lot",
    "Volume Number",
    "use_code",
    "use_definition",
    "Property Class Code",
    "class_definition",
    "Year Property Built",
    "Number of Bathrooms",
    "Number of Bedrooms",
    "Number of Rooms",
    "Number of Stories",
    "Number of Units",
    "Zoning Code",
    "Construction Type",
    "Lot Depth",
    "Lot Frontage",
    "Property Area",
    "Basement Area",
    "Lot Area",
    "Lot Code",
    "Tax Rate Area Code",
    "Percent of Ownership",
    "Exemption Code",
    "exemption_definition"
    "Status Code",
    "Misc Exemption Value",
    "Homeowner Exemption Value",
    "Current Sales Date",
    "Assessed Fixtures Value",
    "Assessed Improvement Value",
    "Assessed Land Value",
    "Assessed Personal Property Value"
    "Assessor Neighborhood Code",
    "Assessor Neighborhood District",
    "Assessor Neighborhood",
    P."supervisor_district" AS "Supervisor District",
    P."analysis_neighborhood" AS "Analysis Neighborhood",
    p."the_geom"
FROM CTE
LEFT JOIN TAKE_HOME_USER2.ASR.PARCELS P ON CTE."Parcel Number" = P."parcel_number"
"""

In [5]:
# initialize snowflake query class and connect
snowflake_query = Snowflake_Query()
snowflake_query.connect()

In [6]:
# run query and return results as dataframe
promt_1_df = snowflake_query.run_query(query=promt_1_query)

In [7]:
# check output of query
promt_1_df.head()

Unnamed: 0,Closed Roll Year,Property Location,Parcel Number,Block,Lot,Volume Number,use_code,use_definition,Property Class Code,class_definition,...,Current Sales Date,Assessed Fixtures Value,Assessed Improvement Value,Assessed Land Value,Assessor Neighborhood Code,Assessor Neighborhood District,Assessor Neighborhood,Supervisor District,Analysis Neighborhood,the_geom
0,19,0000 3001 HYDE ST0000,1001,1,1,1,COMM,Commercial Misc,G,Garages (Commercial),...,0,0,0,0,0,,,2.0,Russian Hill,POINT (-122.421556695858 37.808657558421)
1,19,0000 3002 HYDE ST0000,2001,2,1,1,COMM,Commercial Misc,G,Garages (Commercial),...,0,0,0,0,0,,,2.0,Russian Hill,POINT (-122.420498649251 37.808729641773)
2,19,0000 0160 JEFFERSON ST0000,4002,4,2,1,MISC,Miscellaneous/Mixed-Use,VCI,Vacant Lot Comm and Ind,...,0,0,0,0,0,,,3.0,North Beach,POINT (-122.41508327985201 37.808508251741)
3,19,0000 0286 JEFFERSON ST0000,5001,5,1,1,COMM,Commercial Misc,G,Garages (Commercial),...,0,0,0,0,0,,,3.0,North Beach,POINT (-122.41667952143501 37.808403377461)
4,19,0366 0350 JEFFERSON ST0000,6001,6,1,1,COMM,Commercial Misc,G,Garages (Commercial),...,0,0,0,0,0,,,3.0,North Beach,POINT (-122.41834725814999 37.808315455774)


In [8]:
# 634,351 rows and 40 columns
promt_1_df.shape

(634351, 40)

In [9]:
# parse JSON with flatten function to generate dataset for processing
promt_2_query = """
select 
    value:container_id::string as container_id,
    value:fill_level_percentage::float as fill_level_percentage,
    value:timestamp::timestamp as timestamp
from TAKE_HOME_USER2.SENSOR.TIMESERIES,
LATERAL FLATTEN(input => V:data:data)
"""

In [10]:
# run query and return results as dataframe
promt_2_df = snowflake_query.run_query(query=promt_2_query)

In [11]:
# check output of query
promt_2_df.head()

Unnamed: 0,CONTAINER_ID,FILL_LEVEL_PERCENTAGE,TIMESTAMP
0,04ba0e70-7ff8-11e9-9c59-204e9b6fc128,2.0,2019-06-18 15:32:00
1,04ba0e70-7ff8-11e9-9c59-204e9b6fc128,1.0,2019-06-18 15:17:00
2,04ba0e70-7ff8-11e9-9c59-204e9b6fc128,24.0,2019-06-18 15:02:00
3,04ba0e70-7ff8-11e9-9c59-204e9b6fc128,23.0,2019-06-18 14:47:00
4,04ba0e70-7ff8-11e9-9c59-204e9b6fc128,23.0,2019-06-18 14:32:00


In [12]:
# function to calculate alerts for each container
# iterates over dataframe and checks if fill percentage is above 80 4 times in a row
# creates dictionary for the alert
# checks if fill percentage goes above 100 3 times in a row and updates dictionary
# checks if fill percentage is below 20 4 times in a row and updates dictionary
# returns list of dictionaries
def calculate_alerts(subset_df):
    above_80_count = 0
    above_100_count = 0
    below_20_count = 0
    alert_triggered = None
    alerts = []
    for idx, row in subset_df.iterrows():
        if not alert_triggered:
            if row["FILL_LEVEL_PERCENTAGE"] > 80.0:
                above_80_count += 1
                if above_80_count == 4:
                    alert_triggered = True
                    container_dict = {
                        "container_id": row["CONTAINER_ID"],
                        "alert_triggered_time": row["TIMESTAMP"],
                        "serviced_time": None,
                        "pick_up_time_hrs": None,
                        "overflow": 0,
                    }
                    above_80_count = 0
            else:
                above_80_count = 0
        else:
            if row["FILL_LEVEL_PERCENTAGE"] > 100.0:
                above_100_count += 1
                if above_100_count == 3:
                    container_dict["overflow"] = 1
            elif row["FILL_LEVEL_PERCENTAGE"] < 20.0:
                below_20_count += 1
                if below_20_count == 4:
                    container_dict["serviced_time"] = row["TIMESTAMP"]
                    container_dict["pick_up_time_hrs"] = round(
                        (
                            container_dict["serviced_time"]
                            - container_dict["alert_triggered_time"]
                        ).total_seconds()
                        / 3600,
                        2,
                    )
                    alerts.append(container_dict)
                    alert_triggered = None
                    above_100_count = 0
                    below_20_count = 0
            else:
                above_100_count = 0
                below_20_count = 0
    if alert_triggered:
        alerts.append(container_dict)
    return alerts

In [13]:
# main function to generate aler dataset
# creates array of unique container ids
# creates subset dataframe for each unique container
# calls calculate_alerts function and returns list of alerts for each container
# if list is not empty, merges list into final alerts list
# returns dataframe of final alerts
def generate_alert_dataset(df):
    final_alerts = []
    container_ids = df["CONTAINER_ID"].unique()
    for container_id in container_ids:
        container_subset = (
            df[df["CONTAINER_ID"] == container_id]
            .sort_values("TIMESTAMP")
            .copy()
            .reset_index(drop=True)
        )
        alerts_per_conatiner_id = calculate_alerts(subset_df=container_subset)
        if alerts_per_conatiner_id:
            final_alerts.extend(alerts_per_conatiner_id)
    return pd.DataFrame(final_alerts)

In [14]:
# generates alerts from processed JSON
alerts_dataset = generate_alert_dataset(df=promt_2_df)

In [15]:
# view output
alerts_dataset.head()

Unnamed: 0,container_id,alert_triggered_time,serviced_time,pick_up_time_hrs,overflow
0,04ba0e70-7ff8-11e9-9c59-204e9b6fc128,2019-06-19 13:53:34,2019-06-21 17:27:22,51.56,0
1,04ba0e70-7ff8-11e9-9c59-204e9b6fc128,2019-06-23 21:39:13,2019-06-24 16:08:59,18.5,1
2,087c58c0-7a75-11e9-b5c2-f5836e79b9a7,2019-06-19 16:06:44,2019-06-20 00:46:53,8.67,0
3,087c58c0-7a75-11e9-b5c2-f5836e79b9a7,2019-06-21 22:26:24,2019-06-22 15:29:17,17.05,0
4,087c58c0-7a75-11e9-b5c2-f5836e79b9a7,2019-06-23 02:36:47,2019-06-24 15:54:24,37.29,1


In [16]:
# close connection to snowflake
snowflake_query.close_con()

In [17]:
# save outputs and compress
promt_1_df.to_csv("output_1.csv.zip", compression="zip")
alerts_dataset.to_csv("output_2.csv.zip", compression="zip")