## State Summaries From Electricity Data

### Initial Setup

In [1]:
import os, sys
import pprint as p
import pyspark.sql.functions as pysF
%matplotlib widget
%matplotlib inline

py_file_path = os.path.join(
    os.getcwd(),
    "..",
    ".."
)

sys.path.append(py_file_path)
from app.SparkTools import MyPySpark

MySpark = None

#ensure only one sc and spark instance is running
global MySpark
MySpark = MySpark or MyPySpark(
    master = 'local[3]', 
    logger_name = 'jupyter')

### Filter and Join Data

In [2]:
p.pprint(
    list(
        set(
            MySpark\
            .spark\
            .read\
            .parquet("/Processed/ElectricityDimDF")\
            .select("value_type")
            .toPandas()["value_type"])))

['Consumption for useful thermal output',
 'Fuel consumption MMBtu',
 'Electric fuel consumption MMBtu',
 'Number of customer accounts',
 'Consumption for electricity generation (Btu)',
 'Fuel consumption quantity',
 'Average cost of fossil fuels for electricity generation',
 'Fossil-fuel stocks for electricity generation',
 'Net generation',
 'Total consumption (Btu)',
 'Total consumption',
 'Receipts of fossil fuels by electricity plants (Btu)',
 'Electric fuel consumption quantity',
 'Revenue from retail sales of electricity',
 'Consumption for electricity generation',
 'Average cost of fossil fuels for electricity generation (per Btu)',
 'Retail sales of electricity',
 'MMBtu per unit',
 'Average retail price of electricity',
 'Receipts of fossil fuels by electricity plants',
 'Quality of fossil fuels in electricity generation',
 'Consumption for useful thermal output (Btu)']


In [3]:
electricity_dim_df = MySpark\
    .spark\
    .read\
    .parquet("/Processed/ElectricityDimDF")\
    .select("series_id", "state", "fuel_type", "engine_type", "units")\
    .filter(
        (pysF.col("value_type") == 'Fuel consumption MMBtu') &
        (pysF.col("engine_type") == 'all primemovers'))\
    .withColumn(
        "fuel_type",
        pysF.regexp_replace(
            pysF.regexp_replace(
                pysF.col("fuel_type"),
                "[^a-zA-Z0-9\s]", 
                ""),
            "\s",
            "_"))

electricity_fact_df = MySpark\
    .spark\
    .read\
    .parquet("/Processed/ElectricityFactDF")

electricity_df = electricity_fact_df.join(
    pysF.broadcast(electricity_dim_df),
    on = "series_id",
    how = "right"
)

In [4]:
fuel_types_l = electricity_df\
    .select("fuel_type")\
    .distinct()\
    .rdd.map(lambda x: x[0])\
    .collect()

electricity_pivoted_summary_df = electricity_df\
    .groupBy("date", "state")\
    .pivot("fuel_type")\
    .agg(pysF.sum("value"))\
    .fillna(
        0, 
        subset=[i for i in fuel_types_l if i])\
    .drop("null")\
    .withColumn(
        "other",
        pysF.col("all_fuels") - sum(pysF.col(i) for i in fuel_types_l if i and i != "all_fuels"))

In [6]:
electricity_formatted_df = MySpark.melt(
    df = electricity_pivoted_summary_df,
    id_vars = ["date", "state"],
    value_vars = [i for i in fuel_types_l if i and i != "all_fuels"],
    var_name = 'fuel_type')

In [7]:
# Use formatted_df for plotting
# Also use over() to create percentages by year by state for relative plot.