In [1]:
import pandas as pd
import numpy as np
import json
import dask.dataframe as dd
import requests
from io import StringIO
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from math import ceil
import datetime
import calendar

In [2]:
#Azure Functions
def azure_upload_blob(connect_str, container_name, blob_name, data):
  blob_service_client = BlobServiceClient.from_connection_string(connect_str)
  blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
  blob_client.upload_blob(data, overwrite = True)
  print(f"Uploaded to Azure Blob: {blob_name}.")

def azure_download_blob(connect_str, container_name, blob_name):
  blob_service_client = BlobServiceClient.from_connection_string(connect_str)
  blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
  download_stream = blob_client.download_blob()
  return download_stream.readall()

#Google Cloud Functions
def google_upload_blob(bucket_name, source_file_name, destination_blob_name):
  storage_client = storage.Client()
  bucket = storage_client.bucket(bucket_name)
  blob = bucket.blob(destination_blob_name)
  blob.upload_from_filename(source_file_name)
  print(f"FIle {source_file_name} uploaded to {destination_blob_name}.")

def google_download_blob(bucket_name, source_blob_name, destination_file_name):
  storage_client = storage.Client()
  bucket = storage_client.bucket(bucket_name)
  blob = bucket.blob(source_blob_name)
  blob.download_to_filename(destination_file_name)
  print(f"BLob {source_blob_name} downloaded to {destination_file_name}.")

#AWS Functions
def aws_upload_file(file_name, bucket, object_name = None):
  if object_name is None:
    object_name = os.path.basename(file_name)
  s3_client = boto3.client('s3')
  response = s3_client.upload_file(file_name, bucket, object_name)
  print(f"Uploaded {file_name} to S3 bucket {bucket}.")

def aws_download_file(bucket, object_name, file_name):
  s3_client = boto3.client('s3')
  s3_client.download_file(bucket, object_name, file_name)
  print(f"Downloaded {object_name} from S3 bucket {bucket}.")

In [3]:
def week_of_month(dt):
  first_day = dt.replace(day = 1)
  dom = dt.day
  adjusted_dom = dom + first_day.weekday()
  return int(ceil(adjusted_dom/7.0))

def get_week_of_year(date_str):
  date = datetime.strptime(date_str, '%Y-%m-%d')

  week_of_year = date.isocalendar()[1]

  return week_of_year

In [23]:
# Path to the JSON configuration file
config_file_path = 'config.json'

# Open the configuration file and load the JSON data into the config variable
with open(config_file_path, 'r') as config_file:
    config = json.load(config_file)

# Retrieve the Azure Storage connection string from the loaded configuration
CONNECTION_STRING_AZURE_STORAGE = config["connectionString"]
CONTAINER_AZURE = 'waterconsumption'

# Initialize BlobServiceClient using connection string
blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING_AZURE_STORAGE)

# Get a blob client to interact with a specific blob
#blob_client = blob_service_client.get_blob_client(container=CONTAINER_AZURE, blob=blob_name)

# Create the container
container_client = blob_service_client.get_container_client(CONTAINER_AZURE)

consumption_water_df = pd.DataFrame()

blob_list = container_client.list_blobs()
for blob in blob_list:
  print(blob.name)
  blob_client = container_client.get_blob_client(blob = blob.name)
  blob_data = blob_client.download_blob()
  blob_content = blob_data.readall().decode('utf-8')
  df = pd.read_csv(StringIO(blob_content))

  print(df.shape)

  consumption_water_df = df.copy()

water_consumption.csv
(50315, 25)


In [6]:
consumption_water_df.columns

Index(['Development Name', 'Borough', 'Account Name', 'Location', 'Meter AMR',
       'Meter Scope', 'TDS #', 'EDP', 'RC Code', 'Funding Source', 'AMP #',
       'Vendor Name', 'UMIS BILL ID', 'Revenue Month', 'Service Start Date',
       'Service End Date', '# days', 'Meter Number', 'Estimated',
       'Current Charges', 'Rate Class', 'Bill Analyzed', 'Consumption (HCF)',
       'Water&Sewer Charges', 'Other Charges'],
      dtype='object')

In [7]:
consumption_water_df.head()

Unnamed: 0,Development Name,Borough,Account Name,Location,Meter AMR,Meter Scope,TDS #,EDP,RC Code,Funding Source,...,Service End Date,# days,Meter Number,Estimated,Current Charges,Rate Class,Bill Analyzed,Consumption (HCF),Water&Sewer Charges,Other Charges
0,BAISLEY PARK,QUEENS,BAISLEY PARK,BLD 09,AMR,BLD 09,91.0,240,Q009100,FEDERAL,...,01/26/2020,34.0,K13060723,N,196.35,Basic Water and Sewer,Yes,19,196.35,0.0
1,BAISLEY PARK,QUEENS,BAISLEY PARK,BLD 09,AMR,BLD 09,91.0,240,Q009100,FEDERAL,...,02/24/2020,29.0,K13060723,N,258.35,Basic Water and Sewer,Yes,25,258.35,0.0
2,BAISLEY PARK,QUEENS,BAISLEY PARK,BLD 09,AMR,BLD 09,91.0,240,Q009100,FEDERAL,...,03/23/2020,28.0,K13060723,N,217.02,Basic Water and Sewer,Yes,21,217.02,0.0
3,BAISLEY PARK,QUEENS,BAISLEY PARK,BLD 09,AMR,BLD 09,91.0,240,Q009100,FEDERAL,...,04/23/2020,31.0,K13060723,N,103.34,Basic Water and Sewer,Yes,10,103.34,0.0
4,BAY VIEW,BROOKLYN,BAY VIEW,BLD 25 - Community Center,NONE,Community Center,92.0,670,K209200,MIXED FINANCE/LLC1,...,01/26/2020,34.0,E17250205,N,72.34,Basic Water and Sewer,Yes,7,72.34,0.0


In [8]:
consumption_water_df.dropna()
consumption_water_df['TDS #'] = consumption_water_df['TDS #'].astype('Int64')
consumption_water_df['# days'] = consumption_water_df['# days'].astype('Int64')
consumption_water_df.head()

Unnamed: 0,Development Name,Borough,Account Name,Location,Meter AMR,Meter Scope,TDS #,EDP,RC Code,Funding Source,...,Service End Date,# days,Meter Number,Estimated,Current Charges,Rate Class,Bill Analyzed,Consumption (HCF),Water&Sewer Charges,Other Charges
0,BAISLEY PARK,QUEENS,BAISLEY PARK,BLD 09,AMR,BLD 09,91,240,Q009100,FEDERAL,...,01/26/2020,34,K13060723,N,196.35,Basic Water and Sewer,Yes,19,196.35,0.0
1,BAISLEY PARK,QUEENS,BAISLEY PARK,BLD 09,AMR,BLD 09,91,240,Q009100,FEDERAL,...,02/24/2020,29,K13060723,N,258.35,Basic Water and Sewer,Yes,25,258.35,0.0
2,BAISLEY PARK,QUEENS,BAISLEY PARK,BLD 09,AMR,BLD 09,91,240,Q009100,FEDERAL,...,03/23/2020,28,K13060723,N,217.02,Basic Water and Sewer,Yes,21,217.02,0.0
3,BAISLEY PARK,QUEENS,BAISLEY PARK,BLD 09,AMR,BLD 09,91,240,Q009100,FEDERAL,...,04/23/2020,31,K13060723,N,103.34,Basic Water and Sewer,Yes,10,103.34,0.0
4,BAY VIEW,BROOKLYN,BAY VIEW,BLD 25 - Community Center,NONE,Community Center,92,670,K209200,MIXED FINANCE/LLC1,...,01/26/2020,34,E17250205,N,72.34,Basic Water and Sewer,Yes,7,72.34,0.0


In [9]:
# Perform data transformation
consumption_water_df['Service Start Date'] = pd.to_datetime(consumption_water_df['Service Start Date'])
consumption_water_df['Service End Date'] = pd.to_datetime(consumption_water_df['Service End Date'])
consumption_water_df['Year'] = consumption_water_df['Service Start Date'].dt.year
consumption_water_df['Quarter'] = consumption_water_df['Service Start Date'].dt.quarter
consumption_water_df['Month'] = consumption_water_df['Service Start Date'].dt.month
consumption_water_df['Day'] = consumption_water_df['Service Start Date'].dt.day
consumption_water_df['WeekOfYear'] = consumption_water_df['Service Start Date'].dt.isocalendar().week
#consumption_water_df['WeekOfMonth'] = consumption_water_df['Service Start Date'].apply(week_of_month)

# Drop null values
consumption_water_df.dropna(inplace=True)

# Data quality checks (e.g., check for duplicates)
duplicates = consumption_water_df[consumption_water_df.duplicated()]
if not duplicates.empty:
    print("Duplicate rows found!")

Duplicate rows found!


In [10]:
# Define the data mapping for each dimension and fact table
# Dim_Vendor
vendor_mapping = {
    'VendorName': {'source_column': 'Vendor Name', 'destination_column': 'VendorName', 'data_type': 'string', 'description': 'Utility vendor name'},
}

# Dim_Development
development_mapping = {
    'DevelopmentName': {'source_column': 'Development Name', 'destination_column': 'DevelopmentName', 'data_type': 'string', 'description': 'The name of the housing development as listed in the Development Data Book'},
    'BuildingNumber': {'source_column': 'Location', 'destination_column': 'BuildingNumber', 'data_type': 'string', 'description': 'Building number'},
    'ElectronicDataProcessingNumber': {'source_column': 'EDP', 'destination_column': 'ElectronicDataProcessingNumber', 'data_type': 'integer', 'description': 'NYCHA Electronic Data Processing. Number used to identify individual NYCHA developments.'},
    'RC_Code': {'source_column': 'RC Code', 'destination_column': 'RC_Code', 'data_type': 'string', 'description': 'NYCHA budget responsibility code.'},
    'FundingSource': {'source_column': 'Funding Source', 'destination_column': 'FundingSource', 'data_type': 'string', 'description': 'The development’s funding source including Federal, Mixed Finance, or an indication that the facility is a non development facility'},
}

# Dim_Meter
meter_mapping = {
    'MeterNumber': {'source_column': 'Meter Number', 'destination_column': 'MeterNumber', 'data_type': 'string', 'description': 'Meter number'},
    'MeterAMR': {'source_column': 'Meter AMR', 'destination_column': 'MeterAMR', 'data_type': 'string', 'description': 'Is the meter Automatic Meter Reading (AMR), Interval or none'},
    'MeterScope': {'source_column': 'Meter Scope', 'destination_column': 'MeterScope', 'data_type': 'string', 'description': 'The buildings or areas the account and meter supply'},
}

# Dim_Location
location_mapping = {
    'BoroughName': {'source_column': 'Borough', 'destination_column': 'BoroughName', 'data_type': 'string', 'description': 'Borough'},
    'Zone': {'source_column': 'Location', 'destination_column': 'Zone', 'data_type': 'string', 'description': 'Zone'},
    # Add Longitude and Latitude mapping if available
}

# Dim_BillingCycle
billing_cycle_mapping = {
    'RevenueMonth': {'source_column': 'Revenue Month', 'destination_column': 'RevenueMonth', 'data_type': 'date', 'description': 'Year and month of bill: 2016-01'},
    'ServiceStartDate': {'source_column': 'Service Start Date', 'destination_column': 'ServiceStartDate', 'data_type': 'date', 'description': 'Bill start date'},
    'ServiceEndDate': {'source_column': 'Service End Date', 'destination_column': 'ServiceEndDate', 'data_type': 'date', 'description': 'Bill end date'},
    'NumberOfDays': {'source_column': 'number days', 'destination_column': 'NumberOfDays', 'data_type': 'integer', 'description': 'Number of days on bill'},
}

# Facts_ConsumerWaterUsage
consumer_water_usage_mapping = {
    'ConsumptionVolume': {'source_column': 'Consumption (HCF)', 'destination_column': 'ConsumptionVolume', 'data_type': 'float', 'description': 'Total HCF (Hundred Cubic Feet) consumption'},
    'WaterConsumptionCost': {'source_column': 'Water&Sewer Charges', 'destination_column': 'WaterConsumptionCost', 'data_type': 'float', 'description': 'Total water & sewer charges'},
    'TotalUtilityCost': {'source_column': 'Current Charges', 'destination_column': 'TotalUtilityCost', 'data_type': 'float', 'description': 'Total costs'},
    # Add other fields as needed
}

# Merge all mappings
all_mappings = {
    'Dim_Vendor': vendor_mapping,
    'Dim_Development': development_mapping,
    'Dim_Meter': meter_mapping,
    'Dim_Location': location_mapping,
    'Dim_BillingCycle': billing_cycle_mapping,
    'Facts_ConsumerWaterUsage': consumer_water_usage_mapping,
}


In [11]:
#dim_vendor
unique_vendor_names = consumption_water_df['Vendor Name'].unique()

# Convert the array of unique values into a dataframe
unique_vendor_df = pd.DataFrame(unique_vendor_names, columns=['VendorName'])

# Filtering out any NA values if necessary
unique_vendor_df = unique_vendor_df.dropna(subset=['VendorName'])

# Display the resulting dataframe
unique_vendor_df

Unnamed: 0,VendorName
0,NEW YORK CITY WATER BOARD


In [12]:
#dim_development
unique_development_names = consumption_water_df['Development Name'].unique()
unique_development_df = pd.DataFrame(unique_development_names, columns=['DevelopmentName'])
unique_development_df['BuildingNumber'] = consumption_water_df['Location']
unique_development_df['ElectronicDataProcessingNumber'] = consumption_water_df['EDP']
unique_development_df['RC_Code'] = consumption_water_df['RC Code']
unique_development_df['FundingSource'] = consumption_water_df['Funding Source']

# Applying the mapping to create a new column with descriptions
for column in ['DevelopmentName', 'BuildingNumber', 'ElectronicDataProcessingNumber', 'RC_Code', 'FundingSource']:
    mapping_info = development_mapping[column]
    
# Filtering out any NA values if necessary
unique_development_df = unique_development_df.dropna(subset=['BuildingNumber'])

# Display the resulting dataframe
unique_development_df

Unnamed: 0,DevelopmentName,BuildingNumber,ElectronicDataProcessingNumber,RC_Code,FundingSource
0,BAISLEY PARK,BLD 09,240.0,Q009100,FEDERAL
1,BAY VIEW,BLD 09,240.0,Q009100,FEDERAL
2,BERRY STREET-SOUTH 9TH STREET,BLD 09,240.0,Q009100,FEDERAL
3,CLAREMONT REHAB (GROUP 2),BLD 09,240.0,Q009100,FEDERAL
4,CONLON LIHFE TOWER,BLD 25 - Community Center,670.0,K209200,MIXED FINANCE/LLC1
5,ARMSTRONG I,BLD 25 - Community Center,670.0,K209200,MIXED FINANCE/LLC1
6,FENIMORE-LEFFERTS,BLD 25 - Community Center,670.0,K209200,MIXED FINANCE/LLC1
7,FORT WASHINGTON AVENUE REHAB,BLD 25 - Community Center,670.0,K209200,MIXED FINANCE/LLC1
8,GOMPERS,BLD 02,777.0,K035700,FEDERAL
9,HOWARD AVENUE-PARK PLACE,BLD 02,777.0,K035700,FEDERAL


In [24]:
# Dim Meter
unique_meter_numbers = consumption_water_df['Meter Number'].unique()
unique_meter_df = pd.DataFrame(unique_meter_numbers, columns=['MeterNumber'])
unique_meter_df['MeterAMR'] = consumption_water_df['Meter AMR']
unique_meter_df['MeterScope'] = consumption_water_df['Meter Scope']

# Applying the mapping to create a new column with descriptions
for column in ['MeterAMR', 'MeterScope']:
    mapping_info = meter_mapping[column]

# Filtering out any NA values if necessary
unique_meter_df = unique_meter_df.dropna(subset=['MeterAMR'])

# Display the resulting dataframe
unique_meter_df

Unnamed: 0,MeterNumber,MeterAMR,MeterScope
0,K13060723,AMR,BLD 09
1,E17250205,AMR,BLD 09
2,O78363626,AMR,BLD 09
3,O78779274,AMR,BLD 09
4,O78363667,NONE,Community Center
...,...,...,...
790,15020283,AMR,"BLD 02, STORE 04-05"
791,78778861,AMR,"BLD 02, STORE 04-05"
792,74892806,AMR,"BLD 02, STORE 04-05"
793,9010020,AMR,"BLD 01, STORE 01-03"


In [16]:
# Dim_Location
unique_locations = consumption_water_df['Location'].unique()
unique_location_df = pd.DataFrame(unique_locations, columns=['LocationID'])
unique_location_df['BoroughName'] = consumption_water_df['Borough']
# Assuming Longitude, and Latitude are available in the dataset
#unique_location_df['Longitude'] = consumption_water_df['Longitude']
#unique_location_df['Latitude'] = consumption_water_df['Latitude']

# Applying the mapping to create a new column with descriptions
for column in ['BoroughName']:
    mapping_info = location_mapping[column]
   

# Filtering out any NA values if necessary
unique_location_df = unique_location_df.dropna(subset=['BoroughName'])

# Display the resulting dataframe
unique_location_df

Unnamed: 0,LocationID,BoroughName
0,BLD 09,QUEENS
1,BLD 25 - Community Center,QUEENS
2,BLD 02,QUEENS
3,BLD 04,QUEENS
4,BLD 03,BROOKLYN
5,BLD 01,BROOKLYN
6,BLD 06,BROOKLYN
7,BLD 15,BROOKLYN
8,BLD 13,BROOKLYN
9,BLD 11,BROOKLYN


In [17]:
# Define the start and end dates for your date dimension
start_date = consumption_water_df['Service Start Date'].min()
end_date = consumption_water_df['Service End Date'].max()

# Create a DataFrame with a range of dates
date_range = pd.date_range(start=start_date, end=end_date, freq='D')
date_df = pd.DataFrame(date_range, columns=['Date'])

# Extract attributes from the date
date_df['DateID'] = date_df['Date'].dt.strftime('%Y%m%d%H')
date_df['YearNumber'] = date_df['Date'].dt.year
date_df['MonthNumber'] = date_df['Date'].dt.month
date_df['DayNumber'] = date_df['Date'].dt.day
date_df['DayOfWeek'] = date_df['Date'].dt.dayofweek
date_df['DayName'] = date_df['Date'].dt.day_name()
date_df['MonthName'] = date_df['Date'].dt.month_name()
date_df['QuarterNumber'] = date_df['Date'].dt.quarter
date_df['WeekOfYear'] = date_df['Date'].dt.isocalendar().week

new_order = ['DateID', 'Date', 'YearNumber', 'QuarterNumber', 'MonthNumber', 'DayNumber', 'MonthName', 'DayName', 'WeekOfYear']
date_df = date_df[new_order]

# Display the resulting DataFrame
date_df

Unnamed: 0,DateID,Date,YearNumber,QuarterNumber,MonthNumber,DayNumber,MonthName,DayName,WeekOfYear
0,2010082200,2010-08-22,2010,3,8,22,August,Sunday,33
1,2010082300,2010-08-23,2010,3,8,23,August,Monday,34
2,2010082400,2010-08-24,2010,3,8,24,August,Tuesday,34
3,2010082500,2010-08-25,2010,3,8,25,August,Wednesday,34
4,2010082600,2010-08-26,2010,3,8,26,August,Thursday,34
...,...,...,...,...,...,...,...,...,...
4537,2023012300,2023-01-23,2023,1,1,23,January,Monday,4
4538,2023012400,2023-01-24,2023,1,1,24,January,Tuesday,4
4539,2023012500,2023-01-25,2023,1,1,25,January,Wednesday,4
4540,2023012600,2023-01-26,2023,1,1,26,January,Thursday,4


In [18]:
# Calculate service duration
consumption_water_df['ServiceDuration'] = pd.to_datetime(consumption_water_df['Service End Date']) - pd.to_datetime(consumption_water_df['Service Start Date'])

# Create FactID column
consumption_water_df['FactID'] = range(1, len(consumption_water_df) + 1)

# Rename columns
new_column_names = {
    'Vendor Name': 'VendorID',
    'Location': 'LocationID',
    'Development Name': 'DevelopmentID',
    'Meter Number': 'MeterNumber',
    'Consumption (HCF)': 'ConsumptionVolume',
    'Current Charges': 'CurrentCharges',
    'Water&Sewer Charges': 'WaterAndSewerCharges',
    'Other Charges': 'OtherCharges'
}

consumption_water_df = consumption_water_df.rename(columns=new_column_names)

# Check if the columns exist in the DataFrame before reordering
columns_to_order = ['VendorID', 'LocationID', 'Service Start Date', 'Service End Date', 'DevelopmentID', 'MeterNumber', 'ConsumptionVolume', 'CurrentCharges', 'WaterAndSewerCharges', 'OtherCharges', 'ServiceDuration']
existing_columns = [col for col in columns_to_order if col in consumption_water_df.columns]

# Reorder columns
consumption_water_df = consumption_water_df[existing_columns]

# Create ServiceStartDateID and ServiceEndDateID
consumption_water_df['ServiceStartDateID'] = pd.to_datetime(consumption_water_df['Service Start Date']).dt.strftime('%Y%m%d%H')
consumption_water_df['ServiceEndDateID'] = pd.to_datetime(consumption_water_df['Service End Date']).dt.strftime('%Y%m%d%H')

# Display the DataFrame
consumption_water_df.head()

Unnamed: 0,VendorID,LocationID,Service Start Date,Service End Date,DevelopmentID,MeterNumber,ConsumptionVolume,CurrentCharges,WaterAndSewerCharges,OtherCharges,ServiceDuration,ServiceStartDateID,ServiceEndDateID
0,NEW YORK CITY WATER BOARD,BLD 09,2019-12-23,2020-01-26,BAISLEY PARK,K13060723,19,196.35,196.35,0.0,34 days,2019122300,2020012600
1,NEW YORK CITY WATER BOARD,BLD 09,2020-01-26,2020-02-24,BAISLEY PARK,K13060723,25,258.35,258.35,0.0,29 days,2020012600,2020022400
2,NEW YORK CITY WATER BOARD,BLD 09,2020-02-24,2020-03-23,BAISLEY PARK,K13060723,21,217.02,217.02,0.0,28 days,2020022400,2020032300
3,NEW YORK CITY WATER BOARD,BLD 09,2020-03-23,2020-04-23,BAISLEY PARK,K13060723,10,103.34,103.34,0.0,31 days,2020032300,2020042300
4,NEW YORK CITY WATER BOARD,BLD 25 - Community Center,2019-12-23,2020-01-26,BAY VIEW,E17250205,7,72.34,72.34,0.0,34 days,2019122300,2020012600
