# Read Azure Data Lake
Uses Azure Libraries and authentication to get access to Data Lakes and read into a data frame.

# Azure Login
In order to login with your computer, within the terminal you need to input  

<code>az login</code>  

From there, a new window should pop up and you can login through there. You only need to type this in the terminal once, unless your password changes.  
Dependencies
: [Azure CLI](https://learn.microsoft.com/en-us/cli/azure/install-azure-cli-windows?tabs=azure-cli)

# Azure Secret Documentation
[Keyvaults](https://learn.microsoft.com/en-us/python/api/overview/azure/keyvault-secrets-readme?view=azure-python#list-secrets)

In [92]:
# Import libraries

# For authentication
from azure.identity import DefaultAzureCredential

# For accessing Azure Key Vault
from azure.keyvault.secrets import SecretClient

# For accessing storage accounts
from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions

import datetime
import pandas as pd

In [93]:
# Global Variables

# Set your credential
CREDENTIAL = DefaultAzureCredential()
KEY_VAULT_URL = 'https://keyvaultname.vault.azure.net/'
SECRET_CLIENT = SecretClient(vault_url=KEY_VAULT_URL, credential=CREDENTIAL)

In [124]:
pd.DataFrame([['secret','value','desc']],columns=['Secret Name','Secret Value', 'Secret Description'])

Unnamed: 0,Secret Name,Secret Value,Secret Description
0,secret,value,desc


In [139]:
# List all secrets and descriptions
AzureSecretsDF = pd.DataFrame(columns=['Secret Name', 'Secret Description'])
for property in SECRET_CLIENT.list_properties_of_secrets():
    azsecret = [property.name,property.content_type]
    tempdf = pd.DataFrame(data=[azsecret], columns=AzureSecretsDF.columns)
    AzureSecretsDF = pd.concat([AzureSecretsDF, tempdf], ignore_index=True)
AzureSecretsDF

Unnamed: 0,Secret Name,Secret Description
0,ADLS-StorageAccount-Name,Data Lake Storage Account Name - sgdataanalyti...
1,ced365export2dlprod-StorageAccount-Name,
2,Databricks-eventhub-key,SAS Policy key for Event Hub
3,eh-primary-key,
4,IOTC-EventHub-8Part-ConnectionString,Event Hub with 8 Partitions SAS Connection String
5,iotc-eventhub-8partitions-sas-key,sas key to eventhub
6,IOTC-StorageAccount-AccessKey,Access Key for IOTCTSIStorage
7,IOTC-StorageAccount-Name,Data Lake Storage Account Name - iotctsistorage
8,SECRET-NAME-TEST,This is the description of your secret/name
9,SGAnalyticsAccessKey,SG Analytics Storage Account Access Key


In [95]:
# Example Secret Pull
SECRET_NAME = 'SECRET-NAME-TEST'
print(SECRET_CLIENT.get_secret(SECRET_NAME).properties.name)
print(SECRET_CLIENT.get_secret(SECRET_NAME).value)
print(SECRET_CLIENT.get_secret(SECRET_NAME).properties.content_type)

SECRET-NAME-TEST
YOUR SECRET
This is the description of your secret/name


In [109]:
# Account url points towards the storage account name
ACCOUNT_NAME = SECRET_CLIENT.get_secret('IOTC-StorageAccount-Name').value
ACCOUNT_URL = f"https://{ACCOUNT_NAME}.blob.core.windows.net"
STORAGE_ACCOUNT_ACCESSKEY = SECRET_CLIENT.get_secret('IOTC-StorageAccount-AccessKey').value

In [97]:
Blob_SC = BlobServiceClient(ACCOUNT_URL, credential=CREDENTIAL)

In [98]:
# Example of setting a 1 hour time limit from now
print(f'Now Time: {datetime.datetime.now(datetime.UTC)}')
print(f'Time example with + 1 hours: {datetime.datetime.now(datetime.UTC) + datetime.timedelta(hours=2)}')

Now Time: 2024-05-25 04:37:04.771493+00:00
Time example with + 1 hours: 2024-05-25 06:37:04.771493+00:00


In [99]:
# Interate through a list of containers or folders to get the names within your storage account
container_list = []
for container in Blob_SC.list_containers():
    container_list.append(container['name'])
container_list

['env-781a063f-8041-4049-8a73-d69954ceaabe',
 'eventhub-telemetry',
 'iotcentral-raw',
 'raw-dev',
 'testcontainer']

In [100]:
CONTAINER_NAME = 'env-781a063f-8041-4049-8a73-d69954ceaabe'
TOKEN_EXPIRATION = datetime.datetime.now(datetime.UTC) + datetime.timedelta(hours=1)
PERMISSION = BlobSasPermissions(read=True)

In [101]:
# Folder Names within container
blob_names = list(Blob_SC.get_container_client(CONTAINER_NAME).list_blob_names())
all_folders = []
for blob in blob_names:
    folders = blob.split('/')
    folders.pop()
    all_folders.append('/'.join(folders))
all_folders = set(all_folders)

In [102]:
# Set folder name in which you want to get all blobs or files.
FOLDER = 'V=1'

In [103]:
# Get parquet files within blob and show top 5 files/blobs
blob_list = [blob for blob in blob_names if ((blob.lower().find('parquet') > 0))]
blob_list[:5]

['V=1/PT=Live/Y=2021/M=05/20210528183415090_20210528183352018_20210528183401894_2c64f002_00.parquet',
 'V=1/PT=Live/Y=2024/M=05/20240524024752820_20240524024729686_20240524024752718_5d56e7ca_00.parquet',
 'V=1/PT=Live/Y=2024/M=05/20240524024815985_20240524024752936_20240524024815782_5d56e7ca_00.parquet',
 'V=1/PT=Live/Y=2024/M=05/20240524024839022_20240524024815907_20240524024838954_5d56e7ca_00.parquet',
 'V=1/PT=Live/Y=2024/M=05/20240524024902125_20240524024838986_20240524024902002_5d56e7ca_00.parquet']

In [104]:
# Create a function to generate a SAS token for each file individually
def get_sas(account_name, account_key, countainer_name, blob_name, expiry, permission):
    sas = generate_blob_sas(
        account_name=account_name
        ,account_key= account_key
        ,container_name=countainer_name
        ,blob_name=blob_name
        ,expiry=expiry
        ,permission=permission
    )
    return sas

In [105]:
# Create a function to create SAS URL in order to access file
def create_sas_url(saskey, account_name, container_name, blob_name):
    return 'https://' + account_name + '.blob.core.windows.net/' + container_name + '/' + blob_name + '?' + saskey

In [115]:
# Create a function to create a dataframe after aggregating all parquet files
def create_df_from_parquets(account_name, account_key, countainer_name, expiry, permission, loadnumber=len(blob_list)):
    df = pd.DataFrame()
    global blob_list
    for blob in blob_list[:loadnumber]:
        sas_token = get_sas(account_name=account_name,account_key=account_key, countainer_name=countainer_name,blob_name=blob,expiry=expiry,permission=permission)
        url = create_sas_url(account_name=account_name, saskey=sas_token, container_name=countainer_name,blob_name=blob)
        df2 = pd.read_parquet(url)
        df = pd.concat([df, df2])
        print(f'Added: {blob}')
    return df

In [117]:
raw = create_df_from_parquets(account_name=ACCOUNT_NAME, account_key=STORAGE_ACCOUNT_ACCESSKEY,countainer_name=CONTAINER_NAME,expiry=TOKEN_EXPIRATION, permission=PERMISSION, loadnumber=5)

Added: V=1/PT=Live/Y=2021/M=05/20210528183415090_20210528183352018_20210528183401894_2c64f002_00.parquet
Added: V=1/PT=Live/Y=2024/M=05/20240524024752820_20240524024729686_20240524024752718_5d56e7ca_00.parquet
Added: V=1/PT=Live/Y=2024/M=05/20240524024815985_20240524024752936_20240524024815782_5d56e7ca_00.parquet
Added: V=1/PT=Live/Y=2024/M=05/20240524024839022_20240524024815907_20240524024838954_5d56e7ca_00.parquet
Added: V=1/PT=Live/Y=2024/M=05/20240524024902125_20240524024838986_20240524024902002_5d56e7ca_00.parquet


In [118]:
raw.head()

Unnamed: 0,timestamp,applicationId_string,messageSource_string,deviceId_string,schema_string,templateId_string,enqueuedTime_datetime,telemetry.LINEPSI_double,telemetry.LINETEMPEGUMIN_long,telemetry.ODORANTLEVELEGUMAX_long,...,telemetry.METHANE7TROUBLEALM_bool,telemetry.METHANE7WARNING_bool,telemetry.LOWBANKPSI_string,telemetry.STAGE1BTEMP_double,telemetry.FEEDTEMP_long,telemetry.STAGE4TEMP_string,telemetry.STAGE4PSI_string,telemetry.SUCTIONPSI_string,telemetry.INLETPSI_long,telemetry.HIGHBANKPSI_string
0,2021-05-28 18:33:58.159000+00:00,d4670217-6f19-4d8c-a468-7024604f11ee,telemetry,20158-cpa1,default@v1,dtmi:modelDefinition:fr1a7yekw:ek9gf77h4,2021-05-28 18:33:56.867000+00:00,,,,...,,,,,,,,,,
1,2021-05-28 18:33:53.393000+00:00,d4670217-6f19-4d8c-a468-7024604f11ee,telemetry,20158-cpa2,default@v1,dtmi:modelDefinition:fr1a7yekw:ek9gf77h4,2021-05-28 18:33:51.871000+00:00,,,,...,,,,,,,,,,
2,2021-05-28 18:33:56.737000+00:00,d4670217-6f19-4d8c-a468-7024604f11ee,telemetry,20158-cpa3,default@v1,dtmi:modelDefinition:fr1a7yekw:ek9gf77h4,2021-05-28 18:33:55.368000+00:00,,,,...,,,,,,,,,,
3,2021-05-28 18:33:58.957000+00:00,d4670217-6f19-4d8c-a468-7024604f11ee,telemetry,20158-cpa4,default@v1,dtmi:modelDefinition:fr1a7yekw:ek9gf77h4,2021-05-28 18:33:57.777000+00:00,,,,...,,,,,,,,,,
4,2021-05-28 18:34:01.894000+00:00,d4670217-6f19-4d8c-a468-7024604f11ee,telemetry,20158-cpa5,default@v1,dtmi:modelDefinition:fr1a7yekw:ek9gf77h4,2021-05-28 18:34:00.460000+00:00,,,,...,,,,,,,,,,
