In [1]:
import seaborn as sns
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
import pandas as pd
import findspark
findspark.init('C:\spark')
import pyspark
from pyspark.sql import SparkSession,Row

from sklearn.impute import SimpleImputer
from sklearn.preprocessing import normalize

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor,LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import IsotonicRegression


spark = SparkSession.builder.appName('basics').getOrCreate()

In [2]:
def load_average_vehicle(file_dir):
    average_vehicle_df = pd.read_excel(file_dir, sheet_name="2.1, 2.2, 2.3,2.4", header=2, nrows=19, usecols="A:AH")
    return average_vehicle_df

In [3]:
def load_light_fleet_age(file_dir):
    light_fleet_age_df = pd.read_excel(file_dir, sheet_name="2.10", header=1, nrows=7)
    return light_fleet_age_df

In [4]:
def load_co2_emission(file_dir):
    co2_emission_df = pd.read_excel(file_dir, sheet_name="1.10", header=2, nrows=17, usecols="A:E")
    return co2_emission_df

In [5]:
def get_sort_array_index(array):
    print(np.sort(array))
    order = []
    for element in np.sort(array):
        for idx, pca_value in enumerate(array):
            if element == pca_value:
                order.append(idx)

    print(order)

In [6]:
def step_2_4_1_imputation_co2_emission_df(co2_emission_df):
    missing_values = [[2000] + [np.nan for i in range(4)], [2018] + [np.nan for i in range(4)]]

    imp = SimpleImputer(missing_values=np.nan, strategy='mean')
    imp.fit(co2_emission_df)
    X = imp.transform(missing_values)

    df = pd.DataFrame(X, columns=co2_emission_df.columns)
    co2_emission_df = pd.concat([co2_emission_df, df])
    co2_emission_df = co2_emission_df.sort_values(by=['Year'])
    co2_emission_df.index = [x for x in range(len(co2_emission_df.index))]

    return co2_emission_df

In [7]:
def plot_normality(target, name):
    mean,std = np.mean(target), np.std(target)
    X = np.linspace(np.min(target), np.max(target), 1000)
    pdf = stats.norm.pdf(X, mean, std)
    plt.plot(X, pdf, label="PDF")
    plt.grid()
    plt.title('Check Normal Distribution for %s' %name,fontsize=10)
    plt.xlabel('x')
    plt.ylabel('Probability')
    plt.show()

In [8]:
def step_2_4_2_check_normality(number_fleets_df, light_fleet_age_df,co2_emission_df):
    plot_normality(number_fleets_df['Total LPV new'], 'Total LPV new')
    plot_normality(number_fleets_df[' Total LPV used'], 'Total LPV used')
    plot_normality(number_fleets_df['Total LCV new'], 'Total LCV new')
    plot_normality(number_fleets_df[' Total LCV used'], 'Total LCV used')

    plot_normality(np.array(light_fleet_age_df.iloc[0][1:20].astype(int)), '0-4 age group')
    plot_normality(np.array(light_fleet_age_df.iloc[1][1:20].astype(int)), '5-9 age group')
    plot_normality(np.array(light_fleet_age_df.iloc[2][1:20].astype(int)), '10-14 age group')
    plot_normality(np.array(light_fleet_age_df.iloc[3][1:20].astype(int)), '15-19 age group')
    plot_normality(np.array(light_fleet_age_df.iloc[4][1:20].astype(int)), '20+ age group')


    plot_normality(co2_emission_df['Light passenger'], 'Light passenger co2 emssion')
    plot_normality(co2_emission_df['Light commercial'], 'Light commercial co2 emission')

In [9]:
def step_3_1_clean_light_age_distribution(light_fleet_age_df):
    light_fleet_age_df = light_fleet_age_df.T
    light_fleet_age_df.index = [x for x in range(len(light_fleet_age_df.index))]

    numbers = pd.DataFrame(light_fleet_age_df[1:20])
    numbers = numbers.drop(columns=[6])
    numbers.columns = ['0-4 years', '5-9 years', '10-14 years', '15-19 years', '20+ years', 'Total']
    numbers.index = [i for i in range(2000, 2019)]

    percentages = pd.DataFrame(light_fleet_age_df[20:])
    percentages = percentages.drop(columns=5)
    percentages.columns = ['0-4 years percentage', '5-9 years  percentage',
                           '10-14 years percentage', '15-19 years percentage',
                           '20+ years percentage', '15+ years percentage']
    percentages.index = [i for i in range(2000, 2019)]

    new_age_distribution = pd.concat([numbers, percentages], axis=1, join='inner')
    new_age_distribution.insert(0, 'Period', new_age_distribution.index)
    new_age_distribution.index = [i for i in range(len(new_age_distribution.index))]

    return new_age_distribution

In [10]:
def step_3_3_construct_new_distribution_df(nums_columns, percentage_columns, number_fleets_df, new_age_distribution_df):

    # new_table = {'Period':[i for i in range(2000,2019)]} for internal output
    new_table = {}
    for num_column in nums_columns:
        for percenate_column in percentage_columns:
            new_column = new_age_distribution_df[percenate_column] * number_fleets_df[num_column]
            new_column_name = '%s of %s' % (percenate_column[:-11].strip(), num_column[6:].strip())
            new_table[new_column_name] = new_column

    new_age_distribution_df = pd.DataFrame(new_table)

    return new_age_distribution_df

In [11]:
def step_3_5_convert_object_to_int(data_df):
    for column in data_df.columns:
        if data_df.dtypes[column] != np.float64:
            data_df[column] = data_df[column].astype(np.int64)

    return data_df

In [12]:
def step_4_1_LPV_cols(cleaned_data_df):
    default_LPV_cols = ['Period', 'Total LPV new',' Total LPV used', 'Light passenger average age','0-4 years of LPV new', 
                    '5-9 years of LPV new', '10-14 years of LPV new', '15-19 years of LPV new', '20+ years of LPV new',
                    '15+ years of LPV new', '0-4 years of LPV used', '5-9 years of LPV used', '10-14 years of LPV used',
                    '15-19 years of LPV used', '20+ years of LPV used', '15+ years of LPV used', ]

    LPV = cleaned_data_df[default_LPV_cols]
    sparkdf = spark.createDataFrame(LPV)

    assembler = VectorAssembler(inputCols= sparkdf.columns,outputCol="features")
    output = assembler.transform(sparkdf)
    pca = PCA(k=16, inputCol="features", outputCol="pcaFeatures")
    model = pca.fit(output)
    print(model.explainedVariance)
    
    result = model.transform(output).select("pcaFeatures")
    result.show(truncate=False)
    
    LPV_PCA_list = result.head(2)

    print(LPV_PCA_list[0][0])
    get_sort_array_index(LPV_PCA_list[0][0])
    print(LPV_PCA_list[1][0])         
    get_sort_array_index(LPV_PCA_list[1][0])


In [13]:
def step_4_1_LCV_cols(cleaned_data_df):
    default_LCV_cols = ['Period',  'Total LCV new', ' Total LCV used', 'Light commercial average age', '0-4 years of LCV new',
                    '5-9 years of LCV new', '10-14 years of LCV new', '15-19 years of LCV new', '20+ years of LCV new',
                    '0-4 years of LCV used', '5-9 years of LCV used', '10-14 years of LCV used','15-19 years of LCV used',
                    '20+ years of LCV used', ]

    LCV = cleaned_data_df[default_LCV_cols]
    LCV_sparkdf = spark.createDataFrame(LCV)

    assembler = VectorAssembler(
        inputCols= LCV_sparkdf.columns,
        outputCol="features")
    LCV_output = assembler.transform(LCV_sparkdf)
    
    pca = PCA(k=14, inputCol="features", outputCol="pcaFeatures")
    model = pca.fit(LCV_output)
    print(model.explainedVariance)
    
    result = model.transform(LCV_output).select("pcaFeatures")
    result.show(truncate=False)
    LCV_PCA_list = result.head(2)
    
    print(LCV_PCA_list[0][0])
    get_sort_array_index(LCV_PCA_list[0][0])

In [14]:
def step_4_2_normalization(data_df,LPV_cols,LCV_cols):
    LPV_df, LCV_df = data_df[LPV_cols],data_df[LCV_cols]
    LPV_data,LCV_data = normalize( LPV_df, axis=1, norm='l2'),normalize(LCV_df, axis=1, norm='l2')
    LPV_df, LCV_df = pd.DataFrame(LPV_data, columns= LPV_cols),pd.DataFrame(LCV_data, columns= LCV_cols)
    return LPV_df, LCV_df

In [None]:
def assemble_dataset(df, seed):
    spark_df = spark.createDataFrame(df)
    assembler = VectorAssembler(inputCols= spark_df.columns[:-1],outputCol="features")
    spark_assembler_df = assembler.transform(spark_df)
    
    selected_data = spark_assembler_df.select('features',spark_df.columns[-1])
    train_data,test_data = selected_data.randomSplit([0.7,0.3],seed=seed)
    
    noisy_df = spark.createDataFrame([[Vectors.dense(np.zeros(13)),0.0]])
    train_data,test_data = train_data.union(noisy_df),test_data.union(noisy_df)
    
    return train_data,test_data

In [None]:
def step_6_vertify_dt(train_data, test_data, labelcol):
    dt = DecisionTreeRegressor(featuresCol='features', labelCol=labelcol)
    dt_model =dt.fit(train_data)


    predictions =dt_model.transform(test_data)
    predictions.show()
    evaluator = RegressionEvaluator(
        labelCol=labelcol, predictionCol="prediction", metricName="r2")
    r2 = evaluator.evaluate(predictions)
    print("R2 on test data = %g" % r2)

In [None]:
def ridge_regressor(train_data, test_data, labelcol, reg_param):
    
    lr = LinearRegression(labelCol=labelcol, regParam=reg_param, elasticNetParam=0, loss='squaredError')
#     lr = IsotonicRegression(labelCol=labelcol)

    lrModel = lr.fit(train_data)

    predictions = lrModel.transform(test_data)
    test_results = lrModel.evaluate(test_data)
    
#     evaluator = RegressionEvaluator(labelCol=labelcol, predictionCol="prediction", metricName="r2")
#     r2 = evaluator.evaluate(predictions)

    return predictions, test_results.residuals, test_results.r2, lrModel
#     return predictions, predictions.select('prediction'), r2,lrModel



In [None]:
def step_7_1_assemble_dataset(df, seed):
    spark_df = spark.createDataFrame(df)
    assembler = VectorAssembler(inputCols= spark_df.columns[:-1],outputCol="features")
    spark_assembler_df = assembler.transform(spark_df)
    
    selected_data = spark_assembler_df.select('features',spark_df.columns[-1])
    train_data,test_data = selected_data.randomSplit([0.7,0.25],seed=seed)
    
    noisy_df = spark.createDataFrame([[Vectors.dense(np.zeros(13)),0.0] for _ in range(3)])

    train_data,test_data = train_data.union(noisy_df),test_data.union(noisy_df)

    
    return train_data,test_data

In [None]:
def step_7_1_objective_2_dataset(rows,columns):
    spark_df = spark.createDataFrame(rows)
    assembler = VectorAssembler(inputCols= spark_df.columns[:-1],outputCol="features")
    spark_assembler_df = assembler.transform(spark_df)
    return spark_assembler_df

In [None]:
def step_7_2_objective2_model(lrModel,test_data):
    predictions = lrModel.transform(test_data)

    pred_values = [ i.prediction for i in predictions.select('prediction').collect()]
    
    return predictions,pred_values,pred_values[0] ,np.sum(pred_values[1:])

In [None]:
def step_8_2_percentage_viz(percentage_list, columns):
    fig = plt.figure(figsize=(9, 5.0625))
    ax1 = fig.add_subplot(121)

    ratios = percentage_list
    labels = columns
    # rotate so that first wedge is split by the x-axis
    angle = -180 * ratios[0]
    ax1.pie(ratios, autopct='%1.1f%%', startangle=angle, labels=labels, )
    plt.show()

In [None]:
def step_8_2_regression_line(data):
    sns.regplot(x="real_value", y="pred_value", data=data)

In [None]:
def step_8_2_residuals(data):
    sns.residplot(x="real_value", y="pred_value", data=data, scatter_kws={"s": 80})