# Step 00 - Load Data in ADLS Gen2-Backed Delta Lake (Databricks)

Notebook retrieves a dataset from the UC Irvine ML Repository, splits into two subsets for training and holdout, and saves as Delta format in ADLS Gen2.

Logic below to create a mount point in ADLS Gen2 if it doesn't exist - requires having an Azure Service Principal with storage blob data contributor access to the ADLS Gen2 account

### Import packages and load dataset

In [None]:
from ucimlrepo import fetch_ucirepo 
import pandas as pd
from datetime import datetime
  
# fetch dataset 
power_consumption_of_tetouan_city = fetch_ucirepo(id=849) 
  
# data (as pandas dataframes) 
X = power_consumption_of_tetouan_city.data.features 
y = power_consumption_of_tetouan_city.data.targets 
  
# metadata 
print(power_consumption_of_tetouan_city.metadata) 
  
# variable information 
print(power_consumption_of_tetouan_city.variables) 


{'uci_id': 849, 'name': 'Power Consumption of Tetouan City', 'repository_url': 'https://archive.ics.uci.edu/dataset/849/power+consumption+of+tetouan+city', 'data_url': 'https://archive.ics.uci.edu/static/public/849/data.csv', 'abstract': 'This dataset is related to power consumption of three different distribution networks of Tetouan city which is located in north Morocco.', 'area': 'Social Science', 'tasks': ['Regression'], 'characteristics': ['Multivariate', 'Time-Series'], 'num_instances': 52417, 'num_features': 6, 'feature_types': ['Real', 'Integer'], 'demographics': [], 'target_col': ['Zone 1 Power Consumption', 'Zone 2  Power Consumption', 'Zone 3  Power Consumption'], 'index_col': None, 'has_missing_values': 'no', 'missing_values_symbol': None, 'year_of_dataset_creation': 2018, 'last_updated': 'Fri Mar 08 2024', 'dataset_doi': '10.24432/C5B034', 'creators': ['Abdulwahed Salam', 'Abdelaaziz El Hibaoui'], 'intro_paper': {'title': 'Comparison of Machine Learning Algorithms for the 

In [None]:
# Assuming power_consumption_of_tetouan_city.data is a valid object with data

X = pd.DataFrame(power_consumption_of_tetouan_city.data.features) 
y= pd.DataFrame(power_consumption_of_tetouan_city.data.targets)

merged_df = pd.concat([X, y], axis=1)
merged_df

Unnamed: 0,DateTime,Temperature,Humidity,Wind Speed,general diffuse flows,diffuse flows,Zone 1 Power Consumption,Zone 2 Power Consumption,Zone 3 Power Consumption
0,1/1/2017 0:00,6.559,73.8,0.083,0.051,0.119,34055.69620,16128.87538,20240.96386
1,1/1/2017 0:10,6.414,74.5,0.083,0.070,0.085,29814.68354,19375.07599,20131.08434
2,1/1/2017 0:20,6.313,74.5,0.080,0.062,0.100,29128.10127,19006.68693,19668.43373
3,1/1/2017 0:30,6.121,75.0,0.083,0.091,0.096,28228.86076,18361.09422,18899.27711
4,1/1/2017 0:40,5.921,75.7,0.081,0.048,0.085,27335.69620,17872.34043,18442.40964
...,...,...,...,...,...,...,...,...,...
52411,12/30/2017 23:10,7.010,72.4,0.080,0.040,0.096,31160.45627,26857.31820,14780.31212
52412,12/30/2017 23:20,6.947,72.6,0.082,0.051,0.093,30430.41825,26124.57809,14428.81152
52413,12/30/2017 23:30,6.900,72.8,0.086,0.084,0.074,29590.87452,25277.69254,13806.48259
52414,12/30/2017 23:40,6.758,73.0,0.080,0.066,0.089,28958.17490,24692.23688,13512.60504


### Split dataset into two sections by date

More recent data will be the "hold out" set

In [None]:
threshold = datetime(2017, 10, 1)

merged_df['DateTime'] = pd.to_datetime(merged_df['DateTime'])

cols = {}
for col in merged_df.columns:
    cols[col] = col.replace(' ', '-')
merged_df.rename(columns=cols, inplace=True)

train_df = merged_df[merged_df['DateTime'] < threshold]
holdout_df = merged_df[merged_df['DateTime'] >= threshold]

train_df


Unnamed: 0,DateTime,Temperature,Humidity,Wind-Speed,general-diffuse-flows,diffuse-flows,Zone-1-Power-Consumption,Zone-2--Power-Consumption,Zone-3--Power-Consumption
0,2017-01-01 00:00:00,6.559,73.8,0.083,0.051,0.119,34055.69620,16128.87538,20240.96386
1,2017-01-01 00:10:00,6.414,74.5,0.083,0.070,0.085,29814.68354,19375.07599,20131.08434
2,2017-01-01 00:20:00,6.313,74.5,0.080,0.062,0.100,29128.10127,19006.68693,19668.43373
3,2017-01-01 00:30:00,6.121,75.0,0.083,0.091,0.096,28228.86076,18361.09422,18899.27711
4,2017-01-01 00:40:00,5.921,75.7,0.081,0.048,0.085,27335.69620,17872.34043,18442.40964
...,...,...,...,...,...,...,...,...,...
39307,2017-09-30 23:10:00,18.420,86.9,4.918,0.084,0.085,35426.54867,20204.15800,16256.26149
39308,2017-09-30 23:20:00,18.400,87.1,4.920,0.062,0.119,34738.40708,19785.03119,15973.85087
39309,2017-09-30 23:30:00,18.440,87.5,4.917,0.055,0.152,34196.81416,19246.15385,15620.83759
39310,2017-09-30 23:40:00,18.360,87.7,4.919,0.062,0.144,33438.58407,18789.60499,15173.68744


In [None]:
holdout_df

Unnamed: 0,DateTime,Temperature,Humidity,Wind-Speed,general-diffuse-flows,diffuse-flows,Zone-1-Power-Consumption,Zone-2--Power-Consumption,Zone-3--Power-Consumption
39312,2017-10-01 00:00:00,18.120,87.6,4.916,0.080,0.163,31755.62363,17985.06224,14898.96657
39313,2017-10-01 00:10:00,18.000,88.5,4.916,0.055,0.137,31175.84245,17611.61826,14531.30699
39314,2017-10-01 00:20:00,18.010,88.4,4.916,0.069,0.148,30154.92341,17148.54772,14321.21581
39315,2017-10-01 00:30:00,18.160,88.4,4.914,0.062,0.167,29764.20131,16827.38589,14198.66261
39316,2017-10-01 00:40:00,18.180,89.1,4.917,0.051,0.115,29600.35011,16621.99170,13889.36170
...,...,...,...,...,...,...,...,...,...
52411,2017-12-30 23:10:00,7.010,72.4,0.080,0.040,0.096,31160.45627,26857.31820,14780.31212
52412,2017-12-30 23:20:00,6.947,72.6,0.082,0.051,0.093,30430.41825,26124.57809,14428.81152
52413,2017-12-30 23:30:00,6.900,72.8,0.086,0.084,0.074,29590.87452,25277.69254,13806.48259
52414,2017-12-30 23:40:00,6.758,73.0,0.080,0.066,0.089,28958.17490,24692.23688,13512.60504


### Create mount point (Databricks to ADLS Gen2) if not exists

In [None]:
# Set up the configurations
storage_account_name = "..."
container_name = "..."
client_id = "..."
tenant_id = "..."
client_secret = "..."

# Construct the ADLS Gen2 URL
adls_url = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"

# Set up the configurations for the mount point
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": client_id,
    "fs.azure.account.oauth2.client.secret": client_secret,
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
}


In [None]:
# Define the mount point
mount_point = "/mnt/deltalake"

# Unmount if the mount point already exists
if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)

# Mount the ADLS Gen2 account
dbutils.fs.mount(
    source=adls_url,
    mount_point=mount_point,
    extra_configs=configs
)

# Verify that the mount was successful
# display(dbutils.fs.ls(mount_point))


/mnt/deltalake has been unmounted.


True

### Convert pandas to spark dataframes and save in Delta format to ADLS Gen2

In [None]:
train_spark_df = df = spark.createDataFrame(train_df)
display(train_spark_df)

DateTime,Temperature,Humidity,Wind-Speed,general-diffuse-flows,diffuse-flows,Zone-1-Power-Consumption,Zone-2--Power-Consumption,Zone-3--Power-Consumption
2017-01-01T00:00:00Z,6.559,73.8,0.083,0.051,0.119,34055.6962,16128.87538,20240.96386
2017-01-01T00:10:00Z,6.414,74.5,0.083,0.07,0.085,29814.68354,19375.07599,20131.08434
2017-01-01T00:20:00Z,6.313,74.5,0.08,0.062,0.1,29128.10127,19006.68693,19668.43373
2017-01-01T00:30:00Z,6.121,75.0,0.083,0.091,0.096,28228.86076,18361.09422,18899.27711
2017-01-01T00:40:00Z,5.921,75.7,0.081,0.048,0.085,27335.6962,17872.34043,18442.40964
2017-01-01T00:50:00Z,5.853,76.9,0.081,0.059,0.108,26624.81013,17416.41337,18130.12048
2017-01-01T01:00:00Z,5.641,77.7,0.08,0.048,0.096,25998.98734,16993.31307,17945.06024
2017-01-01T01:10:00Z,5.496,78.2,0.085,0.055,0.093,25446.07595,16661.39818,17459.27711
2017-01-01T01:20:00Z,5.678,78.1,0.081,0.066,0.141,24777.72152,16227.35562,17025.54217
2017-01-01T01:30:00Z,5.491,77.3,0.082,0.062,0.111,24279.49367,15939.20973,16794.21687


In [None]:
holdout_spark_df = df = spark.createDataFrame(holdout_df)
display(holdout_spark_df)

DateTime,Temperature,Humidity,Wind-Speed,general-diffuse-flows,diffuse-flows,Zone-1-Power-Consumption,Zone-2--Power-Consumption,Zone-3--Power-Consumption
2017-10-01T00:00:00Z,18.12,87.6,4.916,0.08,0.163,31755.62363,17985.06224,14898.96657
2017-10-01T00:10:00Z,18.0,88.5,4.916,0.055,0.137,31175.84245,17611.61826,14531.30699
2017-10-01T00:20:00Z,18.01,88.4,4.916,0.069,0.148,30154.92341,17148.54772,14321.21581
2017-10-01T00:30:00Z,18.16,88.4,4.914,0.062,0.167,29764.20131,16827.38589,14198.66261
2017-10-01T00:40:00Z,18.18,89.1,4.917,0.051,0.115,29600.35011,16621.9917,13889.3617
2017-10-01T00:50:00Z,18.29,89.0,4.919,0.062,0.133,29304.15755,16312.0332,13831.00304
2017-10-01T01:00:00Z,18.59,89.2,4.913,0.069,0.119,29323.06346,16158.92116,13667.59878
2017-10-01T01:10:00Z,18.75,88.9,4.916,0.073,0.096,28800.0,15848.96266,13661.76292
2017-10-01T01:20:00Z,18.61,88.8,4.917,0.048,0.063,28421.88184,15621.16183,13515.86626
2017-10-01T01:30:00Z,18.74,89.2,4.915,0.073,0.122,28308.44639,15602.48963,13422.4924


In [None]:
# Define the output path in ADLS Gen2
output_path = f"{mount_point}/train"

# Save the DataFrame as a Delta table
train_spark_df.write.format("delta").mode("overwrite").save(output_path)

# Confirm the data is saved
display(dbutils.fs.ls(output_path))

path,name,size,modificationTime
dbfs:/mnt/deltalake/train/_delta_log/,_delta_log/,0,1724206223000
dbfs:/mnt/deltalake/train/part-00000-ee2692a6-31cc-4667-ba10-d4f683967640.c000.snappy.parquet,part-00000-ee2692a6-31cc-4667-ba10-d4f683967640.c000.snappy.parquet,292832,1724206227000
dbfs:/mnt/deltalake/train/part-00001-07cb4653-210c-45d3-9da1-731b8e9ac26b.c000.snappy.parquet,part-00001-07cb4653-210c-45d3-9da1-731b8e9ac26b.c000.snappy.parquet,319747,1724206227000
dbfs:/mnt/deltalake/train/part-00002-94bae8cb-f8bb-4173-8b72-5bee4cdd9bb9.c000.snappy.parquet,part-00002-94bae8cb-f8bb-4173-8b72-5bee4cdd9bb9.c000.snappy.parquet,332082,1724206227000
dbfs:/mnt/deltalake/train/part-00003-4cf0ae7c-ee7d-4d27-8f15-4b6d52130951.c000.snappy.parquet,part-00003-4cf0ae7c-ee7d-4d27-8f15-4b6d52130951.c000.snappy.parquet,339683,1724206227000


In [None]:
# Define the output path in ADLS Gen2
output_path = f"{mount_point}/holdout"

# Save the DataFrame as a Delta table
holdout_spark_df.write.format("delta").mode("overwrite").save(output_path)

# Confirm the data is saved
display(dbutils.fs.ls(output_path))

path,name,size,modificationTime
dbfs:/mnt/deltalake/holdout/_delta_log/,_delta_log/,0,1724206259000
dbfs:/mnt/deltalake/holdout/part-00000-e84b6df5-594a-424e-8fd1-a4b3a3712f39.c000.snappy.parquet,part-00000-e84b6df5-594a-424e-8fd1-a4b3a3712f39.c000.snappy.parquet,119697,1724206259000
dbfs:/mnt/deltalake/holdout/part-00001-6775cf1e-05c7-462f-8cb6-a17f3ac284fb.c000.snappy.parquet,part-00001-6775cf1e-05c7-462f-8cb6-a17f3ac284fb.c000.snappy.parquet,118200,1724206259000
dbfs:/mnt/deltalake/holdout/part-00002-5e72f603-c753-4da5-ad79-012dee035a82.c000.snappy.parquet,part-00002-5e72f603-c753-4da5-ad79-012dee035a82.c000.snappy.parquet,118353,1724206259000
dbfs:/mnt/deltalake/holdout/part-00003-2c0013a6-4764-4a6f-bd46-051a1a67ac09.c000.snappy.parquet,part-00003-2c0013a6-4764-4a6f-bd46-051a1a67ac09.c000.snappy.parquet,115956,1724206259000


In [None]:
train_df.head(3).to_dict()

{'DateTime': {0: Timestamp('2017-01-01 00:00:00'),
  1: Timestamp('2017-01-01 00:10:00'),
  2: Timestamp('2017-01-01 00:20:00')},
 'Temperature': {0: 6.559, 1: 6.414, 2: 6.313},
 'Humidity': {0: 73.8, 1: 74.5, 2: 74.5},
 'Wind-Speed': {0: 0.083, 1: 0.083, 2: 0.08},
 'general-diffuse-flows': {0: 0.051, 1: 0.07, 2: 0.062},
 'diffuse-flows': {0: 0.119, 1: 0.085, 2: 0.1},
 'Zone-1-Power-Consumption': {0: 34055.6962, 1: 29814.68354, 2: 29128.10127},
 'Zone-2--Power-Consumption': {0: 16128.87538, 1: 19375.07599, 2: 19006.68693},
 'Zone-3--Power-Consumption': {0: 20240.96386, 1: 20131.08434, 2: 19668.43373}}