#### This is a Databricks Notebook for Data Preparation & Feature Extraction for POC Experimentation
In the inital phase of developing a ML solution, there is a need for experimentation before determining the best approach to develop the ML solution. This Notebook provides a framework to help with creating an intial snapshot of the historical training data to be used for POC experimentation. <br>

**Input(s)**: Various data sources from Datalake & Deltalake <br>
**Outputs(s)**: Consolidated dataset to be saved back to Datalake<br>
**Recommended Cluster** : DBS_MAIN

#### Step 1: Installing the required libraries.
* This step installs the required libraries for during development. <br/>
* Databricks' clusters has got commonly used libraries pre-installed. This is to install additional ones or specify specific versions of libraries. <br/>
* It is not recommended to install libraries at the cluster level as this would cause clashes for others
* dbutils.library.installPyPI("*pkg-name*", version="*x.xx.x*") 
* This is used to install packages, version number is optional but reccommended.

In [0]:
dbutils.library.installPyPI("pandas", version="0.23.4") ## Example
dbutils.library.installPyPI("scikit-learn", version="0.21.3")

dbutils.library.restartPython() ## Restarting the Python shell, will clear all variables
print(dbutils.library.list())   ## Printing the list of packages 

#### Step 2.1: Importing requried libraries

In [0]:
import pandas as pd
import glob, os
from datetime import datetime, timedelta

#### Step 2.2: Accessing Key Vault
* It is important to not store passwords, API Keys, Credentials, Secrets, etc. as hard coded text in the code. This is mainly for security reasons and also for better maintenance.
* All the required confidential information should be stored in the keyvault  
* The confidential information can then be retrived via databricks using the following code.
* dbutils.secrets.get(scope = "KEY_VAULT", key = "*SecretName*")

In [0]:
#Scope is required for all dbutils call to get secrets.
akv_scope ="KEY_VAULT"

dbutils.secrets.get(scope = akv_scope, key = "keyname") ## replace key with the name of the secret within the key vault

In [0]:
######################### Start of usecase specific codes #########################
## * Assumption is that the datalake is already mounted on databricks

Data_path = '/dbfs/mnt/path_in_datalake' ## Path where raw data is stored 
Experiment_path = '/dbfs/mnt/path_in_datalake to store experiment'

## Reading from gen 1 datalake (using pandas)
filelist = glob.glob(Data_path + "/*_full.csv") ## analysing only specific files with full keyword
Merged_df = []
for csv in filelist:
  month_df = pd.read_csv(csv)
  Merged_df.append(month_df)
Merged_df = pd.concat(Merged_df)

#### Step 3.2: Reading Data from DeltaLake
* Essentially, the deltalake is just a folder within the datalake which stores data in a table format. 
* This allows for much an easier/logical data reading process compared to looping through multuple csv as is the case for conventional file based datalake
* Deltalake allows for SQL-like filtering and selecting commands using pyspark to return a subset of requried data.
* The deltalake resides/starts in the path which contains a *"_delta_log"* folder. <br>



In [0]:
### Loading the deltalake path into pyspark. All paths used are sample paths
spark_df = spark.read.format("delta").load('/mnt/INDOD')

### Applying filtering actions on the spark dataframe
filtered_spark_df = spark_df.filter(spark_df['src_timestamp_utc'] >= datetime.utcnow() - timedelta(days=10)) \
#                             .filter(spark_df['gen_settlement_period'].isin(['12','13','14'])) \
#                             .filter(spark_df['gen_settlement_period'] == '12') 

### Cnverting/Persisting spark df into pandas dataframe
PandasDF = filtered_spark_df.toPandas()

#### Step 4: Start of Feature Engineering/Extraction methods
* In this step we will apply use-case specific feature engineering methods.

In [0]:
### Applying feature extraction methods
Merged_df.index = pd.to_datetime(Merged_df.SETTLEMENTDATE, format='%Y/%m/%d %H:%M:%S')

Merged_df['Month'] = Merged_df.index.month
Merged_df['Day'] = Merged_df.index.day
Merged_df['Day_of_week'] = Merged_df.index.dayofweek

Merged_df.drop('SETTLEMENTDATE',axis=1,inplace=True)
Merged_df = Merged_df.drop_duplicates()
Merged_df = Merged_df.resample('30T').ffill()

#### Step 5: Writing the consolidated dataset back into the data lake
* Based on the defined process, aggregated data will be written into the project specific curated folder in the datalake
* In this case the data will be written to ''/dbfs/mnt/dlgen2/curated/energy/data_science/experiments/au/training_demo/POC'' (Experiment path)

In [0]:
RunDate = datetime.utcnow() ## Initializing a run date to as to name the files accordingly

## Check if folder is already created and write to it
try:
  os.makedirs(Experiment_path)
except:
  print('Folder already exists')

#### It is easier to first split the train and test set first to accomodate for Azure AutoML later
from sklearn.model_selection import train_test_split
TrainDF, TestDF = train_test_split(Merged_df, test_size=0.2, shuffle=False)

TrainDF.to_csv(Experiment_path + RunDate.strftime('training_data_%Y%m%d.csv'))
TestDF.to_csv(Experiment_path + RunDate.strftime('testing_data_%Y%m%d.csv'))

### For custom cross-validation in Azure AutoML
For AzureML Python SDK ver 1.6.0 or later, in order to use our own custom cross-validation methods, we need to create addition columns to indicate which rows to use as training and which to use as validation for each fold. Each column represents one cross-validation split, and is filled with integer values 1 or 0 --where 1 indicates the row should be used for training and 0 indicates the row should be used for validation.

In [0]:
from sklearn.model_selection import TimeSeriesSplit
cv = TimeSeriesSplit(n_splits=10)

for i, (train_ind, valid_ind) in enumerate(cv.split(TrainDF)):
  TrainDF['cv'+str(i+1)] = 0
  ind = TrainDF.index[train_ind]
  TrainDF.loc[ind,'cv'+str(i+1)] = 1
  
TrainDF.to_csv(Experiment_path + RunDate.strftime('training_data_%Y%m%d.csv'))