In [2]:
import json
import requests
from datetime import datetime, timedelta
import pandas as pd
import numpy as np

## Code

In [None]:
class SeasonalityScript:
    country_id = None
    start_date = None
    end_date = None
    wc = None
    df = None
    uk_country = None
    years = list()
    holidays = dict()

    def __init__(self, country_code, start_date, end_date, week_commencing, uk_country= None):
        self.country_id = country_code
        self.wc = week_commencing
        self.uk_country = uk_country
        self.start_date = datetime.strptime(start_date, r"%d/%m/%Y").strftime(r"%d/%m/%Y")
        self.end_date = datetime.strptime(end_date, r"%d/%m/%Y").strftime(r"%d/%m/%Y")

        start_year = int(datetime.strptime(start_date, r"%d/%m/%Y").strftime(r"%Y"))
        end_year = int(datetime.strptime(end_date, r"%d/%m/%Y").strftime(r"%Y"))

        while start_year <= end_year:
            self.years.append(start_year)
            start_year += 1
    
    def build_dataframe(self):
        dates = pd.date_range(start= self.start_date, end= self.end_date, freq= f"W-{self.wc}")

        self.df = pd.DataFrame({"account": "national", "date": dates})
        self.df["date"] = self.df["date"].dt.strftime(r"%d/%m/%Y")

        print("DataFrame built succesfully")
    
    def get_holidays(self):
        def __date_convertion(date):
            if self.wc == "MON":
                if date.weekday() != 0:
                    date = date - timedelta(date.weekday())
                    return date.strftime(r"%d/%m/%Y")
                else:
                    return date.strftime(r"%d/%m/%Y")
            elif self.wc == "TUE":
                if date.weekday() != 1:
                    date = date - timedelta(date.weekday() - 1)
                    return date.strftime(r"%d/%m/%Y")
                else:
                    return date.strftime(r"%d/%m/%Y")
            elif self.wc == "WED":
                if date.weekday() != 2:
                    date = date - timedelta(date.weekday() - 2)
                    return date.strftime(r"%d/%m/%Y")
                else:
                    return date.strftime(r"%d/%m/%Y")
            elif self.wc == "THU":
                if date.weekday() != 3:
                    date = date - timedelta(date.weekday() - 3)
                    return date.strftime(r"%d/%m/%Y")
                else:
                    return date.strftime(r"%d/%m/%Y")
            elif self.wc == "FRI":
                if date.weekday() != 4:
                    date = date - timedelta(date.weekday() - 4)
                    return date.strftime(r"%d/%m/%Y")
                else:
                    return date.strftime(r"%d/%m/%Y")
            elif self.wc == "SAT":
                if date.weekday() != 5:
                    date = date - timedelta(date.weekday() - 5)
                    return date.strftime(r"%d/%m/%Y")
                else:
                    return date.strftime(r"%d/%m/%Y")
            elif self.wc == "SUN":
                if date.weekday() != 6:
                    date = date - timedelta(date.weekday() - 6)
                    return date.strftime(r"%d/%m/%Y")
                else:
                    return date.strftime(r"%d/%m/%Y")
                
        # Code for countries requests (excl. the UK)
        if self.uk_country == None:
            print(f"Getting holidays for {self.country_id}...")

            for year in self.years:
                uri = f"https://date.nager.at/api/v3/PublicHolidays/{year}/{self.country_id}"
                connection = requests.get(uri)
                response = json.loads(connection.content)
                
                for item in response:
                    if item["types"][0] == "Public":
                        holiday_name = item["localName"]
                        holiday_date = datetime.strptime(item["date"], r"%Y-%m-%d")
                        
                        holiday_date = __date_convertion(holiday_date)
                        
                        if holiday_name not in self.holidays:
                            self.holidays[holiday_name] = list()
                            self.holidays[holiday_name].append(holiday_date)
                        else:
                            self.holidays[holiday_name].append(holiday_date)
                
            for holiday in self.holidays:
                new_column = list()
        
                for (index, row) in self.df.iterrows():
                    if row["date"] in self.holidays[holiday][:]:
                        new_column.append(1)
                    else:
                        new_column.append(0)

                self.df[holiday + " BH"] = new_column

            print(f"Holidays for {self.country_id} successfully added to DataFrame")
        
        # Code only for the UK requests
        if self.uk_country != None:
            country = f"{self.country_id}-{self.uk_country}"
            print(f"Getting holidays for {country}...")

            for year in self.years:
                uri = f"https://date.nager.at/api/v3/PublicHolidays/{year}/{self.country_id}"
                connection = requests.get(uri)
                response = json.loads(connection.content)
                
                for item in response:
                    try:
                        if item["types"][0] == "Public" and country in item["counties"]:
                            holiday_name = item["localName"]
                            holiday_date = datetime.strptime(item["date"], r"%Y-%m-%d")
                                
                            holiday_date = __date_convertion(holiday_date)
                                
                            if holiday_name not in self.holidays:
                                self.holidays[holiday_name] = list()
                                self.holidays[holiday_name].append(holiday_date)
                            else:
                                self.holidays[holiday_name].append(holiday_date)
                    except:
                        if item["types"][0] == "Public" or item["counties"] == None:
                            holiday_name = item["localName"]
                            holiday_date = datetime.strptime(item["date"], r"%Y-%m-%d")
                                    
                            holiday_date = __date_convertion(holiday_date)
                                    
                            if holiday_name not in self.holidays:
                                self.holidays[holiday_name] = list()
                                self.holidays[holiday_name].append(holiday_date)
                            else:
                                self.holidays[holiday_name].append(holiday_date)
                        
            for holiday in self.holidays:
                new_column = list()
        
                for (index, row) in self.df.iterrows():
                    if row["date"] in self.holidays[holiday][:]:
                        new_column.append(1)
                    else:
                        new_column.append(0)

                self.df[holiday + " BH"] = new_column
            
            self.df = self.df.rename(columns= {'''Queen’s Platinum Jubilee BH''': '''Queen's Platinum Jubilee BH''', '''Queen’s State Funeral BH''': '''Queen's State Funeral BH''',
                                              "Early May Bank Holiday BH": "Early May BH", "Spring Bank Holiday BH": "Spring BH", "Coronation Bank Holiday BH": "Coronation BH"})
            
            print(f"Holidays for {country} successfully added to DataFrame")
    
    def get_csv(self, outpath):
        print("Preparing CSV file...")

        # Creates an array with the DataFrame's headers
        # That row will be added at the top of the DataFrame before adding the blank rows
        headers_row = list()
        for header in self.df.columns:
            headers_row.append(header)

        headers_row = np.array([headers_row])

        # Saves the DataFrame into a CSV file and opens it without the headers
        self.df.to_csv(outpath + r"\Seasonality (testing).csv", index= False)
        self.df = pd.read_csv(outpath + r"\Seasonality (testing).csv", skiprows= [0], header= None)

        headers_row = pd.DataFrame(headers_row, columns= self.df.columns, index= [0])

        # Creates a DataFrame with the blank rows needed for the modeling tool
        rows = list()
        for r in range(0, 9):
            row = list()
            if r == 8:
                for c in range(0, len(self.df.columns)):
                    if c == 0 or c == 1: row.append("NaN")
                    else: row.append("SUB")
            else:        
                for c in range(0, len(self.df.columns)):
                    row.append("Blank")
            
            rows.append(row)

        blank = pd.DataFrame(data= rows, columns= self.df.columns)

        # Adds the headers row at the top of the new DataFrame
        self.df = pd.concat([headers_row, self.df], ignore_index= True)

        # Concatenate both DataFrames and saves the new CSV file
        self.df = pd.concat([blank, self.df], ignore_index= True)
        self.df = self.df.iloc[1:, :]
        self.df.to_csv(outpath + r"\Seasonality (testing).csv", index= False, header= None)
        print("CSV file successfully saved in folder")

# Testings

### Object call - testing

In [None]:
BankHolidays = SeasonalityScript("US", "01/01/2018", "24/07/2023", "MON")

BankHolidays.build_dataframe()
BankHolidays.get_holidays()

BankHolidays.get_csv(r"C:\Users\nicolas.kossacoff\Documents\Python\Seasonality")

### Class code

In [1]:
def date_convertion(wc, date):
    if wc == "MON":
        if date.weekday() != 0:
            date = date - timedelta(date.weekday())
            return date.strftime(r"%d/%m/%Y")
        else:
            return date.strftime(r"%d/%m/%Y")
    elif wc == "TUE":
        if date.weekday() != 1:
            date = date - timedelta(date.weekday() - 1)
            return date.strftime(r"%d/%m/%Y")
        else:
            return date.strftime(r"%d/%m/%Y")
    elif wc == "WED":
        if date.weekday() != 2:
            date = date - timedelta(date.weekday() - 2)
            return date.strftime(r"%d/%m/%Y")
        else:
            return date.strftime(r"%d/%m/%Y")
    elif wc == "THU":
        if date.weekday() != 3:
            date = date - timedelta(date.weekday() - 3)
            return date.strftime(r"%d/%m/%Y")
        else:
            return date.strftime(r"%d/%m/%Y")
    elif wc == "FRI":
        if date.weekday() != 4:
            date = date - timedelta(date.weekday() - 4)
            return date.strftime(r"%d/%m/%Y")
        else:
            return date.strftime(r"%d/%m/%Y")
    elif wc == "SAT":
        if date.weekday() != 5:
            date = date - timedelta(date.weekday() - 5)
            return date.strftime(r"%d/%m/%Y")
        else:
            return date.strftime(r"%d/%m/%Y")
    elif wc == "SUN":
        if date.weekday() != 6:
            date = date - timedelta(date.weekday() - 6)
            return date.strftime(r"%d/%m/%Y")
        else:
            return date.strftime(r"%d/%m/%Y")

In [7]:
date = datetime.strptime(r"30/12/2017", r"%d/%m/%Y")
date_convertion("SAT", date)

'30/12/2017'

In [10]:
wc = "SAT"
holidays = {}
years = []
start_date = "01/01/2018"
end_date = "24/07/2023"
country = "NZ"

# Years
start_year = int(datetime.strptime(start_date, r"%d/%m/%Y").strftime(r"%Y"))
end_year = int(datetime.strptime(end_date, r"%d/%m/%Y").strftime(r"%Y"))
while start_year <= end_year:
    years.append(start_year)
    start_year += 1

# DataFrame
dates = pd.date_range(start= start_date, end= end_date, freq= f"W-SAT")
df = pd.DataFrame({"account": "national", "Date": dates})
df["Date"] = df["Date"].dt.strftime(r"%d/%m/%Y")

for year in years:
    uri = f"https://date.nager.at/api/v3/PublicHolidays/{year}/IE"
    connection = requests.get(uri)
    response = json.loads(connection.content)
            
    for item in response:
        if item["types"][0] == "Public":
            holiday_name = item["localName"]
            holiday_date = datetime.strptime(item["date"], r"%Y-%m-%d")
                                
            holiday_date = date_convertion(wc, holiday_date)

            if holiday_name not in holidays:    
                holidays[holiday_name] = list()
                holidays[holiday_name].append(holiday_date)
            else:
                holidays[holiday_name].append(holiday_date)

print(holidays)

{'Lá Caille': ['06/01/2018', '05/01/2019', '04/01/2020', '02/01/2021', '01/01/2022', '31/12/2022'], 'Lá Fhéile Pádraig': ['17/03/2018', '16/03/2019', '21/03/2020', '20/03/2021', '19/03/2022', '18/03/2023'], 'Luan Cásca': ['07/04/2018', '27/04/2019', '18/04/2020', '10/04/2021', '23/04/2022', '15/04/2023'], 'Lá Bealtaine': ['12/05/2018', '11/05/2019', '09/05/2020', '08/05/2021', '07/05/2022', '06/05/2023'], 'Lá Saoire i mí an Mheithimh': ['09/06/2018', '08/06/2019', '06/06/2020', '12/06/2021', '11/06/2022', '10/06/2023'], 'Lá Saoire i mí Lúnasa': ['11/08/2018', '10/08/2019', '08/08/2020', '07/08/2021', '06/08/2022', '12/08/2023'], 'Lá Saoire i mí Dheireadh Fómhair': ['03/11/2018', '02/11/2019', '31/10/2020', '30/10/2021', '05/11/2022', '04/11/2023'], 'Lá Nollag': ['29/12/2018', '28/12/2019', '26/12/2020', '25/12/2021', '24/12/2022', '30/12/2023'], 'Lá Fhéile Stiofáin': ['29/12/2018', '28/12/2019', '26/12/2020', '25/12/2021', '31/12/2022', '30/12/2023'], 'Lá Fhéile Bríde': ['11/02/2023']}

In [None]:
for holiday in holidays:
    new_column = list()
    
    for (index, row) in df.iterrows():
        if row["Date"] in holidays[holiday][:]:
            new_column.append(1)
        else:
            new_column.append(0)

    df[holiday + " BH"] = new_column

In [None]:
df.columns

In [None]:
df.head(15)

In [None]:
df.to_csv(r"C:\Users\nicolas.kossacoff\Documents\Python\Seasonality\Seasonality (testing II).csv", index= False)

In [None]:
outpath = r"C:\Users\nicolas.kossacoff\Documents\Python\Seasonality"
df = pd.read_csv(outpath + r"\Seasonality (testing).csv", skiprows= [0], header= None)

rows = list()
for r in range(0, 9):
    row = list()
    if r == 8:
        for c in range(0, len(df.columns)):
            if c == 0 or c == 1: row.append("NaN")
            else: row.append("SUB")
    else:        
        for c in range(0, len(df.columns)):
            row.append("Blank")
    
    rows.append(row)

blank = pd.DataFrame(data= rows, columns= df.columns)


In [None]:
headers_row = list()
blank_row = list()
for header in df.columns:
    headers_row.append(header)

for column in range(0, len(df.columns)):
    blank_row.append("Blank")

headers_row = np.array([headers_row])
blank_row = np.array([blank_row])

blank_row = pd.DataFrame(blank_row, columns= df.columns, index= [0])
headers_row = pd.DataFrame(headers_row, columns= df.columns, index= [0])

df = pd.concat([headers_row, df], ignore_index= True)

In [None]:
df = pd.concat([blank, df], ignore_index= True)

In [None]:
outpath = r"C:\Users\nicolas.kossacoff\Documents\Python\Seasonality"
df = pd.read_csv(outpath + r"\Seasonality.csv")

headers_row = list()
for header in df.columns:
    headers_row.append(header)

headers_row = np.array([headers_row])

blank_row = list()
for column in range(0, len(df.columns)):
    blank_row.append("Blank")

blank_row = np.array([blank_row])

blank_row = pd.DataFrame(blank_row, columns= df.columns, index= [0])
df = pd.concat([blank_row, df], ignore_index= True)

df.head()

df.to_csv(outpath + r"\Seasonality (testing).csv", index= False)

In [None]:
df = pd.read_csv(outpath + r"\Seasonality (testing).csv", skiprows= [0])
df.head()

In [None]:
headers_row = pd.DataFrame(headers_row, columns= df.columns, index= [0])
headers_row.head()

In [None]:
rows = list()
for r in range(0, 9):
    row = list()
    if r == 8:
        for c in range(0, len(df.columns)):
            if c == 0 or c == 1: row.append("NaN")
            else: row.append("SUB")
    else:        
        for c in range(0, len(df.columns)):
            row.append("Blank")
            
    rows.append(row)

blank = pd.DataFrame(data= rows, columns= df.columns)
blank.head(10)

In [None]:
df = pd.concat([headers_row, df], ignore_index= True)
df.head()

In [None]:
df = pd.concat([blank, df], ignore_index= True)
df.head(15)

In [None]:
df.to_csv(outpath + r"\Seasonality (testing).csv", index= False)

### Weather

In [None]:
import json
import requests
from datetime import datetime, timedelta
import pandas as pd
from meteostat import Stations, Point, Daily
import numpy as np

Daily.cores = 12

start_date = datetime(2018, 1, 1)
end_date = datetime(2023, 7, 24)

stations = Stations()
stations = stations.region("US")
stations = stations.inventory("daily", (start_date, end_date))
stations = stations.fetch(sample= True)

data = Daily(stations, start= start_date, end= end_date)
data = data.normalize().aggregate("1D", spatial= True).fetch()

data['tavg_normalised']=(data['tavg']-data['tavg'].mean())/data['tavg'].std()
data['prcp_normalised']=(data['prcp']-data['prcp'].mean())/data['prcp'].std()
data['snow_normalised']=(data['snow']-data['snow'].mean())/data['snow'].std()
    
data.replace(np.nan, 0, inplace = True)
    
data = np.around(data, decimals = 2)
    
data = data.reset_index()
    
#convert daily data to weekly 
# by PK
climate = data.resample('W-Mon', label='right', closed = 'right', on='time').mean().reset_index().sort_values(by='time')
# Rounding to 2 decimal places
climate = np.around(climate, decimals = 2)

In [None]:
data = data.normalize().aggregate("1D", spatial= True).fetch()

data['tavg_normalised']=(data['tavg']-data['tavg'].mean())/data['tavg'].std()
data['prcp_normalised']=(data['prcp']-data['prcp'].mean())/data['prcp'].std()
data['snow_normalised']=(data['snow']-data['snow'].mean())/data['snow'].std()
    
data.replace(np.nan, 0, inplace = True)
    
data = np.around(data, decimals = 2)
    
data = data.reset_index()
    
#convert daily data to weekly 
# by PK
climate = data.resample('W-Mon', label='right', closed = 'right', on='time').mean().reset_index().sort_values(by='time')
# Rounding to 2 decimal places
climate = np.around(climate, decimals = 2)