# InfluxDB to ADLS PoC Enhancement

- This notebook is used to copy data stored in InfluxDB and send it to Azure Data Lake Storage Gen 2 in given format or as-is.
Supported services: DFL, DataAPI
- To send data as-is service name should be selected as Simple_JSON.

### Libraries

In [None]:
# Import libraries for connecting to Azure Gen2 Storage and InfluxDB
import os
from datetime import datetime
import json
from dateutil.tz import tzlocal

import influxdb_client
from dateutil.tz import tzlocal
from azure.storage.filedatalake import (
    DataLakeServiceClient,
    DataLakeDirectoryClient,
    FileSystemClient
)
from azure.identity import DefaultAzureCredential

import concurrent.futures

### Parameters

In [None]:
tenant_name = dbutils.widgets.get("tenantName")
service_name = dbutils.widgets.get("serviceName")
start_at = dbutils.widgets.get("startDateTime")
stop_at = dbutils.widgets.get("endDateTime")

if 'T' not in start_at:
    start_at = start_at + "T00:00:00Z"
if 'T' not in stop_at:
    stop_at = stop_at + "T00:00:00Z"

print(start_at, stop_at)
print(tenant_name,service_name)

2017-02-01T00:00:00Z 2017-02-02T00:00:00Z
SOLAR DFL


In [None]:
# Create a Data Lake Service Client
account_url = f""
credential = ""
client = DataLakeServiceClient(account_url, credential)

# Check if the container corresponding to the tenant_name exists. If not, it creates the file system (container) for the specified tenant.
file_system_client = client.get_file_system_client(tenant_name.lower())
if not file_system_client.exists():
    file_system_client.create_file_system()
else:
    print("Tenant container exists.")

Tenant container exists


In [None]:
if service_name == "DFL_SimpleJSON" or service_name == "DFL":
    # DFL Config
    bucket = ""
    org = ""
    token = ""
    # Store the URL of your InfluxDB instance
    url=""
elif service_name == "DataAPI_SimpleJSON" or service_name == "DataAPI":
    # Data API Config
    bucket = ""
    org = ""
    token = ""
    # Store the URL of your InfluxDB instance
    url=""

# Create InfluxDB client
client = influxdb_client.InfluxDBClient(
url=url,
token=token,
org=org
)

query_api = client.query_api()


### Definitions

**1. Get Measurements List From InfluxDB**

In [None]:
# Get measurements list
def get_measurement_list(bucket, start_at, stop_at):
    """
    Retrieves a list of unique measurements from a specified InfluxDB bucket within a given time range.
    Args:
    - bucket: The name of the InfluxDB bucket to query.
    - start_at: The start time for the query range.
    - stop_at: The stop time for the query range.
    Returns:
    - A list of unique measurement names found within the specified time range.
    """
    query = f"""
        import "date"         
        from(bucket: "{bucket}")         
        |> range(start: {start_at}, stop: {stop_at})         
        |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")         
        |> keep(columns: ["_measurement"])         
        |> distinct()
        """
    result = query_api.query(query=query)
    measurement_list = []
    for table in result:
        for record in table.records:
            if record["_measurement"] not in measurement_list:
                measurement_list.append(record["_measurement"])
    
    return measurement_list


**2. Query Data For A Specific Measurement**

In [None]:
def query_measurement(bucket, measurement, start_at, stop_at):
    """
    Queries a specific measurement within a specified InfluxDB bucket and time range.
    Args:
    - bucket: The name of the InfluxDB bucket to query.
    - service_name: The name of the service with options: DFL, DFL_SimpleJSON, DataAPI, DataAPI_SimpleJSON
    - measurement: The name of the measurement to query.
    - start_at: The start time for the query range.
    - stop_at: The stop time for the query range.
    Returns:
    - The result of the query for the specified measurement within the given time range.
    """
    query = f"""
        import "date"         
        from(bucket: "{bucket}")         
        |> range(start: {start_at}, stop: {stop_at})
        |> filter(fn: (r) => r["_measurement"] == "{measurement}")         
        |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")         
        |> drop(columns: ["_start", "_stop"])
        """
    result = query_api.query(query=query)
    
    return result


**3. Process Data to Generate JSON**

In [None]:
def process_data(record):
    """
    Process the input record and generate a JSON data structure along with a file path based on the selected service. 
    Args:
    record (dict): The input record containing data to be processed. 
    Returns:
    list: A list containing the file path and the generated JSON data structure.
    """
    if service_name == "DFL":      
        atts_arr = []
        att_curr = None
        json_data = {"Value": record["data"], "Timestamp": record["_time"].isoformat()}
        # Check each key       
        for key, value in record.values.items() :           
            if key.startswith("attr_") :
                att = key.split('_')                   
                # Add attribute name and value      
                if att_curr != att[1] and value is not None:
                    atts = {"Name": att[1], "Value": value, "SemanticRef": None}
                    atts_arr.append(atts)
                    att_curr = att[1]                   
                # Add semantic ref to the last added attribute
                elif att_curr == att[1] and att[2] == "semanticref" and atts_arr:
                    atts_arr[-1]["SemanticRef"] = value
        if atts_arr:
            json_data["Attributes"] = atts_arr

    elif service_name == "DataAPI":
        with open('/dbfs/FileStore/schema.json', 'r') as file:
            schema = json.load(file)
        # print(record.values)
        # Extracting field and attribute names from the schema
        sch_fields = [e["name"] for e in schema["fields"]]
        sch_atts = [a["name"] for a in schema["attributes"]]

        # Initializing output lists
        out_fields = []
        out_atts = []

        # Creating the data dictionary
        data = {"timestamp": record["_time"].isoformat()}
        for key, value in record.values.items():               
            if key in sch_atts:
                out_atts.append({"name": key, "value": value})
            elif key in sch_fields:
                out_fields.append({"name": key, "value": value})

        # Adding fields and attributes to the data dictionary if they exist
        if out_fields:
            data["fields"] = out_fields
        if out_atts:
            data["attributes"] = out_atts

        # Creating the final output JSON
        json_data = {"data": data}

    # If service name: DFL_SimpleJSON or DataAPI_SimpleJSON
    else:
        data = record.values
        print(record.values)
        data.pop("result")
        data.pop("table")
        data["_time"] = record.values.get("_time").isoformat()
        json_data = {key: value for key, value in data.items() if value is not None}
    
    # Create a specific path for the telemetry data
    storage = record.values.get("_measurement")
    deviceId = record.values.get("series_id")
    # Example time: "2024-01-26T06:12:00+00:00"
    timestamp = record.values.get("_time").isoformat()
    year, month, day, hour, minute = timestamp.split('T')[0].split('-') + timestamp.split('T')[1].split(':')[:2]
    file_path = f"/staging/influx/{service_name}/telemetry/{storage}/{deviceId}/{year}/{month}/{day}/{hour}/{minute}"
    
    return [file_path, json_data]

**4. Send JSON Data to Azure Data Lake**

In [None]:
def send_to_ADLS(client, file_path, json_data):
    """
    Send the JSON data to Azure Data Lake Storage (ADLS) at the specified file path.  
    Args:
    client: The client for accessing Azure Data Lake Storage.
    file_path (str): The path in ADLS where the JSON data will be stored.
    json_data (dict): The JSON data to be stored in ADLS.
    Returns:
    None
    """
    # Get the directory system client for the specified file path
    directory_system_client = file_system_client.get_directory_client(file_path)
    # Check if the directory exists, if not, create it
    if not directory_system_client.exists():
        directory_system_client.create_directory()
        print("Path created.")
    else:
        print("Specified path exists.")

    # Check if the file already exists  
    file_client= directory_system_client.get_file_client('telemetry.json')
    if not file_client.exists():
        # If the file does not exist, upload the JSON data as a new file
        json_data = json.dumps([json_data]).encode("utf-8")
        file_client.upload_data(json_data, overwrite= True)
        print("File created.")
    else:
        # If the file exists, download the existing data and append the new JSON data
        read_json_data = file_client.download_file()
        read_json_data = json.loads(read_json_data.readall())
        data = [json_data]
        # Compare timestamps and update existing data accordingly
        if service_name == "DFL":
            for i,entry in enumerate(read_json_data):
                if data[0]["Timestamp"]== entry["Timestamp"]:
                    entry = data[0]
                    read_json_data[i] = entry
                    append_data = read_json_data
                    break
                else:
                    append_data = data + read_json_data
        elif service_name == "DataAPI" :
            for i,entry in enumerate(read_json_data):
                if data[0]["data"]["timestamp"]== entry["data"]["timestamp"]:
                    entry = data[0]
                    read_json_data[i] = entry
                    append_data = read_json_data
                    break
                else:
                    append_data = data + read_json_data
        else :
            for i,entry in enumerate(read_json_data):
                if data[0]["_time"]== entry["_time"]:
                    entry = data[0]
                    read_json_data[i] = entry
                    append_data = read_json_data
                    break
                else:
                    append_data = data + read_json_data
        # Upload the updated JSON data to the file in ADLS
        append_data = json.dumps(append_data).encode("utf-8")
        file_client.upload_data(append_data, overwrite= True)
        print("Data appended.")

### Run

In [None]:
# Start migration
def main():
    
    print(service_name)
    measurement_list = get_measurement_list(bucket, start_at, stop_at)
    
    process_data_list = []
    for measurement in measurement_list:
        result = query_measurement(bucket, measurement, start_at, stop_at)
        for table in result:
            for record in table.records:
                [file_path, json_data]  = process_data(record)
                process_data_list.append((file_path, json_data))
                # send_to_ADLS(client, file_path, json_data)
    # print(service_name + " data upload completed.")
    return process_data_list

process_data_list = main()   

DFL


### Multithreading

In [None]:
# Upload files in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:  # Adjust max_workers as needed
    executor.map(lambda args: send_to_ADLS(client, *args), process_data_list)

Specified path exists
Specified path exists
Specified path exists
Specified path exists
Specified path exists
Data appended.
Specified path exists
Specified path exists
Data appended.
Data appended.
Specified path exists
Specified path exists
Data appended.
Data appended.
Specified path exists
Data appended.
Specified path exists
Specified path exists
Data appended.
Data appended.
Specified path exists
Specified path exists
Data appended.
Specified path exists
Data appended.
Data appended.
Specified path exists
Specified path exists
Data appended.
Specified path exists
Data appended.
Specified path exists
Data appended.
Specified path exists
Data appended.
Specified path exists
Data appended.
Specified path exists
Data appended.
Specified path exists
Data appended.
Data appended.
Specified path exists
Specified path exists
Data appended.
Data appended.
Specified path exists
Specified path exists
Data appended.
Specified path exists
Data appended.
Specified path exists
Data appended.
Sp