In [None]:
import requests 
import datetime as dt
from datetime import date
from datetime import timedelta
from datetime import datetime
import json
import pandas as pd
import io 
import zipfile
import csv
from pyspark.sql.types import StructType, StructField, TimestampType, StringType, IntegerType, DoubleType



In [None]:
# simple helper function to grab the names of the downloadable files
# returns list of dates from 2017 to today

def create_date_list():
    #create string for end of date list
    end_month = datetime.now().month
    end_year = datetime.now().year
    end_date = str(end_year) + "0" + str(end_month) + "01"
    
    dateList = (pd.date_range(start='20170101',end= end_date,freq='MS')).strftime('%Y%m%d').to_list()
    return dateList

In [None]:
#one-time function to create historical csv starting from 2010

def nyiso_scrape(base_url):
    # get content for entire page.
    dates = create_date_list()
    dfl = [] # df list
    
    #nested for loop to go through the daily files inside the list of monthly zip files
    for date in dates:
        url = f"http://mis.nyiso.com/public/csv/damlbmp/{date}damlbmp_zone_csv.zip"

        r = requests.get(url)
        rc = io.BytesIO(r.content) # content 

        zip_file_object = zipfile.ZipFile(rc, 'r')
        
        file_list = zip_file_object.namelist()
        
        #loop through each file in list of files from zip
        for file in file_list:
            daily_file = zip_file_object.open(file) #an individual file
            df = pd.read_csv(daily_file) #read this file
            dfl.append(df) #append this df to list of df's
    
    #combine all dataframes in list into one big dataframe at the end
    comb = pd.concat(dfl)    
    
    #describe the dataset and rename columns, transform dtypes, etc
    print("shape of df:", comb.shape)
    print(comb.describe())

    #see if there are columns with nulls, if any
    null_cols = comb.columns[comb.isnull().any()]
    print("null columns: ", null_cols)
    #rename columns
    cols = {
        "Time Stamp": "timestamp",
        "Name": "zone",
        "PTID": "ptid",
        "LBMP ($/MWHr)": "lbmp",
        "Marginal Cost Losses ($/MWHr)": "marginal_cost_losses",
        "Marginal Cost Congestion ($/MWHr)": "marginal_cost_congestion"
    }
    comb = comb.rename(columns = cols)

    comb['timestamp'] = pd.to_datetime(comb['timestamp']) #convert string to datetime
    return comb


In [None]:

file_url = "http://mis.nyiso.com/public/csv/damlbmp/{}damlbmp_zone_csv.zip"
comb = nyiso_scrape(file_url)
comb

shape of df: (1013025, 6)
               PTID  LBMP ($/MWHr)  Marginal Cost Losses ($/MWHr)  \
count  1.013025e+06   1.013025e+06                   1.013025e+06   
mean   6.178060e+04   3.450018e+01                   1.114778e+00   
std    3.923402e+01   2.779499e+01                   2.228361e+00   
min    6.175200e+04  -1.078000e+01                  -3.542000e+01   
25%    6.175500e+04   1.956000e+01                  -1.200000e-01   
50%    6.175900e+04   2.709000e+01                   7.800000e-01   
75%    6.184400e+04   3.911000e+01                   1.980000e+00   
max    6.184700e+04   6.534100e+02                   3.377000e+01   

       Marginal Cost Congestion ($/MWHr)  
count                       1.013025e+06  
mean                       -5.510448e+00  
std                         1.408813e+01  
min                        -4.573000e+02  
25%                        -5.600000e+00  
50%                        -5.100000e-01  
75%                         0.000000e+00  
max     

Unnamed: 0,timestamp,zone,ptid,lbmp,marginal_cost_losses,marginal_cost_congestion
0,2017-01-01 00:00:00,CAPITL,61757,42.06,0.68,-29.05
1,2017-01-01 00:00:00,CENTRL,61754,15.50,0.20,-2.98
2,2017-01-01 00:00:00,DUNWOD,61760,33.52,1.13,-20.06
3,2017-01-01 00:00:00,GENESE,61753,14.67,0.04,-2.31
4,2017-01-01 00:00:00,H Q,61844,11.89,-0.43,0.00
...,...,...,...,...,...,...
355,2024-09-14 23:00:00,NORTH,61755,27.70,0.08,0.00
356,2024-09-14 23:00:00,NPX,61845,28.64,1.02,0.00
357,2024-09-14 23:00:00,O H,61846,26.10,-1.52,0.00
358,2024-09-14 23:00:00,PJM,61847,27.70,0.08,0.00


In [None]:
#save to csv
comb.to_csv("nyiso_lbmp.csv", index = False)

In [None]:
#convert comb to spark dataframe and save as databricks delta table
schema = StructType([
    StructField("time_stamp", TimestampType(), False),
    StructField("zone", StringType(), False),
    StructField("ptid", IntegerType(), False),
    StructField("lbmp", DoubleType(), False),
    StructField("marginal_cost_losses", DoubleType(), False),
    StructField("marginal_cost_congestion", DoubleType(), False)
])
sdf = spark.createDataFrame(comb, schema = schema)

In [None]:
#save to a databricks table we can perform SQL queries on
sdf.write.format("delta").saveAsTable("nyiso_dayahead_lbmp")