<center><img src="image.png" width=500></center>
<p>

You've recently started a new position as a Data Engineer at an energy company. Previously, analysts on other teams had to manually retrieve and clean data every quarter to understand changes in the sales and capability of different energy types. This process normally took days and was something that most analytsts dreaded. Your job is to automate this process by building a data pipeline. You'll write this data pipeline to pull data each month, helping to provide more rapid insights and free up time for your data consumers.

You will achieve this using the `pandas` library and its powerful parsing features. You'll be working with two raw files; `electricity_sales.csv` and `electricity_capability_nested.json`. 
    
Below, you'll find a data dictionary for the `electricity_sales.csv` dataset, which you'll be transforming in just a bit. Good luck!

| Field | Data Type |
| :---- | :-------: |
| period  | `str`        |
| stateid | `str` |
| stateDescription | `str` |
| sectorid | `str` |
| sectorName | `str` |
| price | `float` |
| price-units | `str` |

In [7]:
import pandas as pd
import json
from pathlib import Path

In [18]:
class InvalidFileFormatError(Exception): 
    """The file has not the correct format (.csv or .parquet)"""

def extract_tabular_data(file_path: str):
    """Extract data from a tabular file_format, with pandas."""
    try:
        if file_path.endswith(".csv"):
            return pd.read_csv(file_path)
        elif file_path.endswith(".parquet"):
            return pd.read_parquet(file_path)
        else:
            raise InvalidFileFormatError(
            f"Warning: Invalid file extension. Please try with .csv or .parquet! "
            )

    except FileNotFoundError as e: 
        raise FileNotFoundError("File not found: {file_path}")
    except Exception as e: 
        raise InvalidFileFormatError(
            f"The file {file_path} seems to be of extension: {ext} but: {e}"
        ) from e 
    
        

In [None]:
def extract_json_data(file_path):
    """Extract and flatten data from a JSON file."""
    with open(file_path, "r") as file:
            content = json.load(file)
            return pd.json_normalize(content)
        
    

In [21]:
def transform_electricity_sales_data(raw_data: pd.DataFrame):
    """
    Transform electricity sales to find the total amount of electricity sold
    in the residential and transportation sectors.
    
    To transform the electricity sales data, you'll need to do the following:
    - Drop any records with NA values in the `price` column. Do this inplace.
    - Only keep records with a `sectorName` of "residential" or "transportation".
    - Create a `month` column using the first 4 characters of the values in `period`.
    - Create a `year` column using the last 2 characters of the values in `period`.
    - Return the transformed `DataFrame`, keeping only the columns `year`, `month`, `stateid`, `price` and `price-units`.
    """
    raw_data = raw_data.dropna(subset=['price'])
    raw_data = raw_data[(raw_data['sectorName'] ==  "residential") | (raw_data['sectorName'] == "transportation" ) ]
    raw_data["month"] = raw_data["period"].str[-2:]
    raw_data["year"] = raw_data["period"].str[:4]
    raw_data = raw_data[["year","month","stateid","price","price-units"]]

    return raw_data
    
    

In [23]:
class InvalidFileFormatError(Exception):
    """The file has not the correct format (.csv or .parquet)"""


def load(dataframe: pd.DataFrame, file_path: str):
    """Load a DataFrame to a file in either CSV or Parquet format."""
    ext = Path(file_path).suffix.lower()
    try:
        if ext == ".csv":
            dataframe.to_csv(file_path, index=False)
        elif ext == ".parquet":
            dataframe.to_parquet(file_path, index=False)
        else:
            raise InvalidFileFormatError(f"""Warning: {filepath} is not a valid file type. Please try again!""")
    except Exception as e: 
        raise InvalidFileFormatError(f"""Warning: {filepath} is not a valid file type. Please try again!""") from e 
    except FileNotFoundError as e:
        raise FileNotFoundError("File not found: {file_path}")
        


In [24]:
# Ready for the moment of truth? It's time to test the functions that you wrote!
raw_electricity_capability_df = extract_json_data("electricity_capability_nested.json")
raw_electricity_sales_df = extract_tabular_data("electricity_sales.csv")

cleaned_electricity_sales_df = transform_electricity_sales_data(raw_electricity_sales_df)

load(raw_electricity_capability_df, "loaded__electricity_capability.parquet")
load(cleaned_electricity_sales_df, "loaded__electricity_sales.csv")