In [1]:
import numpy as np
import pandas as pd
from scipy import stats
import matplotlib.pyplot as plt
%matplotlib inline
import ease
import unittest
from sklearn.model_selection import train_test_split

In [4]:
df = pd.read_csv('../Arranged_Data/final_weater.csv')[['State', 'TotalMonthlyPrecip', 'TempSummer','TempWinter', 'Avgwindspeed']]
df = df[df.State != 'DC']
train,test=train_test_split(df,test_size=0.2)
print(train.head())
print(test.head())

    State  TotalMonthlyPrecip  TempSummer  TempWinter  Avgwindspeed
420    MD            5.366667   74.300000   39.600000      5.033333
133    CT            3.817500   65.512500   24.675000      7.159091
865    SD            1.268921   72.546667   10.775000     10.783453
812    PA            3.133710   73.628707   27.438410      6.787847
1      AK            5.604615   53.386970   27.537821      9.979630
     State  TotalMonthlyPrecip  TempSummer  TempWinter  Avgwindspeed
759     OK            1.992792   75.050000   43.891575     10.387013
170     DE            1.849091   74.050000   40.783333      7.559091
1056    WY            0.592151   72.552941   20.929412      9.361000
412     MA            3.741227   66.531481   27.383133      7.455963
942     UT            0.790973   71.330370   31.849630      7.236364


In [17]:
tw = test.iloc[0,3]
type(tw)

numpy.float64

In [6]:
# The function involves four input parameters then returns a result of vote
def rf(prec, ts, tw, ws):
    """
    This function is RandomForest classifier to intake user input temperature,
    precipitation, and windespeed to classify the possible states the user
    will be in, and returns a dictionary with states as the keys and
    the frequency or count as the values of each key.
       input = user input values, integer or float based.
       vote  = dictionary based output that contains the RF classified states,
               and each states frequency.
    """
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    import itertools as it
    import warnings
    
    # read arranged weatehr data, drop elements contained in state DC, and remove warnings
    train = pd.read_csv('../Arranged_Data/final_weater.csv')[[
            'State', 'TotalMonthlyPrecip', 'TempSummer',
            'TempWinter', 'Avgwindspeed']]
    train = train[train.State != 'DC']
    warnings.filterwarnings('ignore')
    
    # set precipitation, summer temperature, winter temperature, and windspeed as input parameters
    # set total number of trees equal to 500
    # creat a list for saving prediction results and a dictonary for vote outcome
    input_ = [prec, ts, tw, ws]
    tree_num = 100
    pred_list = []
    vote = {}
    rf = RandomForestClassifier(n_estimators = tree_num)
    rf.fit(train.iloc[:,1:5], train.State)

    for i in range(tree_num):
        dt = rf.estimators_[i]
        dt.fit(train.iloc[:,1:5], train.State)
        pred = dt.predict(input_)
        pred_list.append(pred[0])

    pred_key = []
    pred_key_count = []
    for key, group in it.groupby(sorted(pred_list)):
        pred_key.append(key)
        pred_key_count.append(len(list(group)))
    
    # the dictionary of vote contains states as keys and weight percent as values
    for i in range(len(pred_key)):
        vote[pred_key[i]] = pred_key_count[i]/tree_num
    return vote

In [7]:
prec = test.iloc[0,1]
ts = test.iloc[0,2]
tw = test.iloc[0,3]
ws = test.iloc[0,4]
rf(prec, ts, tw, ws)

{'OK': 1.0}

In [8]:
# The function returns the average capacity of resources by multiple their average plant capacity with vote results
def avg_capacity(vote):
    """
    creat a null list
    set original resources sum euqal to zero
    for state in vote keys
        average capacity of resource = average plant capacity of particular resource in specific state * vote results specific state
    return average capacity
    """
    average_plant_capacity = pd.read_csv('../Arranged_Data/average_plant_capacity.csv')
    avg_cap_list = []
    coal_sum = 0
    ng_sum = 0
    petro_sum = 0
    hydro_sum = 0
    solar_sum = 0
    wind_sum =0
    for i in vote.keys():
        coal_sum += int(average_plant_capacity.Coal[average_plant_capacity.State == i]) * vote[i]
        ng_sum += int(average_plant_capacity.NG[average_plant_capacity.State == i]) * vote[i]
        petro_sum += int(average_plant_capacity.Petro[average_plant_capacity.State == i]) * vote[i]
        hydro_sum += int(average_plant_capacity.Hydro[average_plant_capacity.State == i]) * vote[i]
        solar_sum += int(average_plant_capacity.Solar[average_plant_capacity.State == i]) * vote[i]
        wind_sum += int(average_plant_capacity.Wind[average_plant_capacity.State == i]) * vote[i]
    return([coal_sum, ng_sum, petro_sum, hydro_sum, solar_sum, wind_sum])

In [9]:
print(avg_capacity({'ID': 1.0}))
print(type(avg_capacity({'ID': 1.0})))

[19748.0, 211310.0, 30.0, 59166.0, 0.0, 35471.0]
<class 'list'>


In [10]:
# write up a function to select possible type of resources
def possible_type(avg_cap_list):
    """
    creat a empty list to store results of possible type
    for some type in all types
        calculat p value and set up confidence level as 95%
            if p less than alpha, pass
            else, append possible type to empty list
    return possible type 
    """
    cap_pop = pd.read_csv('../Arranged_Data/average_plant_capacity.csv')
    e_type = ['Coal', 'NG', 'Petro', 'Hydro', 'Solar', 'Wind']
    possible_type_list = []
    for i in range(len(e_type)):
        p_value = stats.ttest_1samp(cap_pop[cap_pop[e_type[i]] != 0][e_type[i]], avg_cap_list[i])[1]
        alpha = 0.05  # confidence level
        if avg_cap_list[i] < cap_pop[cap_pop[e_type[i]] !=0][e_type[i]].mean():
            if p_value < alpha:
                pass
            else:
                p_value = -(1 - p_value)
                possible_type_list.append([p_value, avg_cap_list[i], e_type[i]])
        else:
            p_value = (1 - p_value)
            possible_type_list.append([p_value, avg_cap_list[i], e_type[i]])
    return possible_type_list

In [27]:
a = possible_type([3427272.05, 601160.0700000001, 132529.37000000002, 159063.21, 2253.23, 149414.79999999996])
a.sort()
a[2]

[0.9999957284484875, 601160.0700000001, 'NG']

In [12]:
def clean_or_conv(possible_type_list):
    """
    initialize a null list to store conventional resource
    initialize a null lsit to store clean resource
    for some type in all types
        if type equals coal, natural gas, or petroleum, append it to conventional lsit
        elseif type equals hydro, soalr, or wind, append it to clean list
    retun conventional list, clean list
    """
    clean_list = []
    conventional_list = []
    for i in possible_type_list:
        if i[2] == 'Coal' or i[2] == 'NG' or i[2] == 'Petro':
            conventional_list.append(i)
        elif i[2] == 'Hydro' or i[2] == 'Solar' or i[2] == 'Wind':
            clean_list.append(i)
    return conventional_list, clean_list

In [13]:
conv, clean = clean_or_conv([[0.95810457341493549, 3427272.05, 'Coal'],
 [-0.8923811497455455, 601160.0700000001, 'NG'],
 [0.99926594657247059, 132529.37000000002, 'Petro'],
 [-0.90421357886564524, 159063.21, 'Hydro'],
 [-0.83811357367028649, 149414.79999999996, 'Wind']])
clean

[[-0.9042135788656452, 159063.21, 'Hydro'],
 [-0.8381135736702865, 149414.79999999996, 'Wind']]

In [14]:
# The function returns the average cost of resources by multiple their average cost with vote results
def avg_cost(vote):
    """
    set original resources cost euqal to zero
    for state in vote keys
        average cost of resource = average cost of particular resource in specific state * vote results of specific state
    creat a dictionary to store average cost for each type of resource
    return average cost dictinary
    """
    import pandas as pd
    cost = pd.read_csv('../Arranged_Data/Cost/df_cost.csv')
    coal_sum = 0
    ng_sum = 0
    petro_sum = 0
    hydro_sum = 0
    solar_sum = 0
    wind_sum =0
    for i in vote.keys():
        coal_sum += int(cost.Coal[cost.State == i]) * vote[i]
        ng_sum += int(cost.NG[cost.State == i]) * vote[i]
        petro_sum += int(cost.Petro[cost.State == i]) * vote[i]
        hydro_sum += int(cost.Hydro[cost.State == i]) * vote[i]
        solar_sum += int(cost.solar[cost.State == i]) * vote[i]
        wind_sum += int(cost.WindCost[cost.State == i]) * vote[i]
    avg_cost_dict = {'Coal':coal_sum, 'NG':ng_sum, 'Petro':petro_sum, 'Hydro':hydro_sum, 'Solar':solar_sum, 'Wind':wind_sum}
    return avg_cost_dict

In [15]:
a = avg_cost({'ID': 1.0})
len(a.keys())

6

In [16]:
def sort_and_pick(source_list):
    """"""
    source_list.sort()
    if len(source_list) == 3:
        ref = source_list[2]
        if abs(source_list[2][0] - source_list[1][0]) < 0.05 and abs(source_list[1][0] - source_list[0][0]) < 0.05 :
            for i in source_list:
                if i[1] > ref[1]:
                    ref = i
        elif abs(source_list[2][0] - source_list[1][0]) < 0.05 and abs(source_list[1][0] - source_list[0][0]) >= 0.05:
            if source_list[1][1] > ref[1]:
                ref = source_list[1]
    elif len(source_list) == 2:
        ref = source_list[1]
        if abs(source_list[1][0] - source_list[0][0]) < 0.05 :
            if source_list[0][1] > ref[1]:
                ref = source_list[0]
    elif len(source_list) == 1:
        ref = source_list[0]
    else:
        ref = []
    return ref

In [23]:
sort_and_pick(conv)

[0.9581045734149355, 3427272.05, 'Coal']

In [57]:
class TEST_clean_or_conv(unittest.TestCase):
    def test_clean_or_conv(self):
        test = pd.read_csv('../Arranged_Data/test_dataset.csv')
        prec = test.iloc[0,1]
        ts = test.iloc[0,2]
        tw = test.iloc[0,3]
        ws = test.iloc[0,4]
        vote = ease.rf(prec, ts, tw, ws)
        avg_capacity = ease.avg_capacity(vote)
        possible_type = ease.possible_type(avg_capacity)
        conv, clean = ease.clean_or_conv(possible_type)
        
        # test the type of input
        self.assertIsInstance(possible_type,list)
        # test the type of output
        self.assertIsInstance(conv,list)
        # test the element type inside of clean list
        self.assertIsInstance(clean[0][2],str)
        # test if clean source exist in conventional list.
        conv_list = ['Coal','NG','Petro']
        clean_list = ['Hydro','Solar','Wind']
        self.assertNotIn(conv[0][2], clean_list)
        self.assertNotIn(clean[0][2], conv_list)
        
class TEST_avg_cost(unittest.TestCase):
    def test_avg_cost(self):
        test = pd.read_csv('../Arranged_Data/test_dataset.csv')
        prec = test.iloc[0,1]
        ts = test.iloc[0,2]
        tw = test.iloc[0,3]
        ws = test.iloc[0,4]
        vote = ease.rf(prec, ts, tw, ws)
        avg_cost = ease.avg_cost(vote)
        # test the type of input
        self.assertIsInstance(vote,dict)
        # test the type of output
        self.assertIsInstance(avg_cost,dict)
        # test the output length
        self.assertEqual(len(avg_cost.keys()),6)

class TEST_rev_plot(unittest.TestCase):
    def test_rev_plot(self):
        esales = pd.read_csv('../Arranged_Data/Cost/Sale_CO2_tax.csv', skiprows= 1, names = ['Year', 'Sale', 'CO2_tax']) 
        # test if there exist nan in cost dataframe
        self.assertEqual(esales.isnull().sum().sum(),0)
        # test input
        test = pd.read_csv('../Arranged_Data/test_dataset.csv')
        prec = test.iloc[0,1]
        ts = test.iloc[0,2]
        tw = test.iloc[0,3]
        ws = test.iloc[0,4]
        capacity = 100000
        vote = ease.rf(prec, ts, tw, ws)
        avg_cost = ease.avg_cost(vote)
        e_type = ['Coal', 'NG', 'Petro', 'Hydro', 'Solar', 'Wind']
        self.assertIsInstance(avg_cost,dict)
        self.assertIsInstance(capacity,int)
        self.assertIsInstance(e_type,list)

class TEST_sort_and_pick(unittest.TestCase):
    def test_sort_and_pick(self):
        test = pd.read_csv('../Arranged_Data/test_dataset.csv')
        prec = test.iloc[0,1]
        ts = test.iloc[0,2]
        tw = test.iloc[0,3]
        ws = test.iloc[0,4]
        vote = ease.rf(prec, ts, tw, ws)
        avg_cap = ease.avg_capacity(vote)
        source_list = ease.possible_type(avg_cap)
        ref = 
        # test input type
        self.assertIsInstance(source_list,list)
        # test output type
        self.assertIsInstance(ref,list)
        # test if output is in input list
        assertIn(ref,source_list)
        
        

class TEST_suggest(unittest.TestCase):
    def test_suggest(self):
        test = pd.read_csv('../Arranged_Data/test_dataset.csv')
        prec = test.iloc[0,1]
        ts = test.iloc[0,2]
        tw = test.iloc[0,3]
        ws = test.iloc[0,4]
        capacity = 5000
        vote = ease.rf(prec, ts, tw, ws)
        avg_cap = ease.avg_capacity(vote)
        possible_type_list = ease.possible_type(avg_cap)
        conventional, clean = ease.clean_or_conv(possible_type_list)
        conventional = ease.sort_and_pick(conventional)
        clean = ease.sort_and_pick(clean)
        
        # test the type of input
        self.assertIsInstance(prec,np.float64)
        self.assertIsInstance(ts,np.float64)
        self.assertIsInstance(tw,np.float64)
        self.assertIsInstance(ws,np.float64)
        self.assertIsInstance(capacity,int)
        # test output
        if len(clean) == 0:
            self.assertEqual(revenue_clean,none)
        if clean[1] >= capacity:
            self.assertEqual(revenue_conv,none)
        
        
        
        
        
if __name__ == '__main__':
    unittest.main()

E
ERROR: C:\Users\yongquan\AppData\Roaming\jupyter\runtime\kernel-bfb1ee37-fc3c-47f0-8529-556c7109766b (unittest.loader._FailedTest)
----------------------------------------------------------------------
AttributeError: module '__main__' has no attribute 'C:\Users\yongquan\AppData\Roaming\jupyter\runtime\kernel-bfb1ee37-fc3c-47f0-8529-556c7109766b'

----------------------------------------------------------------------
Ran 1 test in 0.004s

FAILED (errors=1)


SystemExit: True