# Imports

In [1]:
from influxdb.resultset import ResultSet
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from influxdb import InfluxDBClient
from pandas import json_normalize
from datetime import datetime
import pandas as pd
import multiprocessing
import urllib3

In [2]:
import logging
LOG_FORMAT = "%(name)s: %(asctime)s - %(message)s"
logging.basicConfig(level = logging.INFO,
                    filename = 'output.log', 
                    format= LOG_FORMAT,
                    filemode = 'a')
logger = logging.getLogger('Data_Query')

# Disabling loggers from influxdb and other modules to prevernt them from pollution log files
logger_influxDB = logging.getLogger('py4j.clientserver')
logger_root = logging.getLogger('root')
logger_influxDB.disabled = True
logger_root.disabled = True


# Initialize Spark Session

In [3]:
spark = SparkSession.builder.appName('Joining Data').getOrCreate()

# Data Preprocessing

In [4]:
# Reading data, removing duplicates, joining tables and selecting desired columns

df2  = spark.read.option('header', 'true').csv('tsdb_data.csv')                         # Reading data from csv file
logger.info("Successfully Read Data from tsdb_data.csv file")
temp = df2.na.drop(subset=["anlage_id"])                                                # Removing rows with null values in anlage_id column
temp = temp.groupBy('anlage_id','steuereinheitnummer').count().drop('count')            # Removing duplicates using groupby 
temp = temp.dropDuplicates()                                                            # Removing duplicates using dropDuplicates (Just in case)

df1 = spark.read.option('header', 'true').csv('a_inst_all.csv')                         # Reading data from csv file
logger.info("Successfully Read Data from a_inst_all.csv file")
df3 = df1.join(temp,df1.Steuereinheit_Nr  ==  temp.steuereinheitnummer,"inner")         # Joining tables using Control Unit Number
df3 = df3.dropDuplicates()                                                              # Removing duplicates
df4 = df3.select('anlage_id','steuereinheitnummer')                                     # Selecting desired columns

#df4.show()
logger.info("Successful Data Preprocessing")

In [5]:
# Converting the data from spark table to a dataframe so that we can get a list of ids for easy access
new_data = df4.toPandas()
device_ids = list(new_data['steuereinheitnummer'])

# Authentication

In [6]:
INFLUX_DB_USER      = "****************"
INFLUX_DB_PASSWORD  = "****************"
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Data Fetching and Parsing

In [7]:
# The list of servers to connect to
serverlist = [
    "influxdb****************.com",
    "influxdb****************.com",
    "influxdb****************.com",
    "influxdb****************.com",
    "influxdb****************.com",
    "influxdb****************.com"
]

max_ids     = 500                                                       # Set to -1 to run over the whole data
batch_size  = 50                                                        # Number of ids to query the servers at once
id_list = device_ids[:max_ids] if max_ids != -1 else device_ids         # Extract the ids that are being queried to the database

In [8]:
"""
This function takes in a list ofqueries the database for the data, parses the response and returns a list of dictionaries containing the data
"""
def query_influxdb(query):
    query_response = []
    res_count = 0   
    for influxdb in serverlist:
        client = InfluxDBClient(
            host=influxdb,
            port=443,
            database="tsdb",
            username=INFLUX_DB_USER,
            password=INFLUX_DB_PASSWORD,
            ssl=True,
            verify_ssl=False,
            pool_size=1
        )
        data = client.query(query)
        for row in data.items():
            # Parse the date recieved from the database
            query_response.extend([dict(**{'mainControllerSerialDeviceId':row[0][1]["mainControllerSerialDeviceId"], 
                            'time': datetime.strptime(item['time'], '%Y-%m-%dT%H:%M:%SZ')}) 
                            for item in list(row[1])])
        res_count += len(data)
    print(f"Found {res_count} result(s)")
    return query_response

In [9]:
# Generating all the queries to be sent to the database
queries = []
for idx in range(0, len(device_ids), batch_size):
    if max_ids != -1 and idx == max_ids:
        break
                 
    start = idx
    end = idx + batch_size
    if end > len(device_ids):
        end = len(device_ids)
    
    ids = "|".join([x for x in device_ids[start:end]])                  # Join the ids to a string for the query in format "id1|id2|id3|..."
    
    query = f"""SELECT * FROM "wallbox_v123_measurement" WHERE mainControllerSerialDeviceId =~ /{ids}/ GROUP BY mainControllerSerialDeviceId ORDER BY time ASC LIMIT 1"""
    queries.append(query)
logger.info("Successfully prepared Database queries")

In [10]:
#len(queries)

## Multiprocessed data fetching from the server

In [11]:
import time
# Run the queries in parallel
start_time = time.time()
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
    query_responses = pool.map(query_influxdb, queries)
#print("Time taken: %s seconds" % (time.time() - start_time))
logger.info(f"Successfully retrieved data from database in {time.time() - start_time} seconds")

Found 1 result(s)
Found 2 result(s)
Found 5 result(s)
Found 6 result(s)
Found 3 result(s)
Found 1 result(s)
Found 0 result(s)
Found 2 result(s)
Found 6 result(s)
Found 6 result(s)


In [12]:
#len(query_responses)

# Post Processing

In [13]:
query_results = []
for response in query_responses:
    if len(response) > 0:
        query_results.extend(response)

In [14]:
#print(f"Total results from database: {len(query_results)}")
logger.info(f"Total results from database: {len(query_results)}")

In [15]:
# Convert the query data into a dataframe. Sort them by Id and Time and keep the first.
# This way only the latest entry corresponding to a specific ID is kept

query_dataframe = pd.DataFrame(query_results) \
    .sort_values(by=['mainControllerSerialDeviceId', 'time']) \
    .drop_duplicates(subset=['mainControllerSerialDeviceId'], keep='first') \
    .reset_index(drop=True)
logger.info("Retrieved latest record for every device id.")

In [16]:
#print(f'Number of ids with a response: {len(query_dataframe)}')

In [17]:
#query_dataframe

In [18]:
'''
Explanation for the code below:

assuming the id_list has contents : [a, b, c, d, e, f, g]
and we get responses from databases for ids: [a_1, a_2, c_1, and d_1]
the following code will make a dataframe in the format:

pd.DataFrame({
    'id': [a_1, a_2, b, c_1, d_1, e, f, g],
    'time': [time_a_1, time_a_2, None, time_c_1, time_d_1, None, None, None]
})
'''


idx = 0
time_list = [None for x in id_list]

while idx < len(query_dataframe):
    id = query_dataframe.loc[idx, 'mainControllerSerialDeviceId']
    
    if id[:-2] in id_list:
        # Get the index
        id_idx = id_list.index(query_dataframe.iloc[idx]['mainControllerSerialDeviceId'][:-2])
        
        # Delete the entry at the index in both lists
        del id_list[id_idx]
        del time_list[id_idx]

        # Put all query results with the same id in both lists
        while query_dataframe.loc[idx, 'mainControllerSerialDeviceId'][:-2] == id[:-2]:
            id_list.insert(id_idx, query_dataframe.loc[idx, 'mainControllerSerialDeviceId'])
            time_list.insert(id_idx, query_dataframe.loc[idx, 'time'])
            id_idx +=1 
            idx += 1
            if idx == len(query_dataframe):
                break
    else:
        idx += 1

output_dataframe = pd.DataFrame({
    'id': id_list,
    'time': time_list
})

logger.info("Database response converted into desired format.")

In [19]:
#output_dataframe

# Saving the final table as a Delta Table

In [20]:
delta_table_path = "delta_id_table.parquet"
try:
    output_delta_table = spark.read.parquet(delta_table_path)
    logger.info("Delta table already exists, writing to it in append mode.")
    output_delta_table.write.mode('append').parquet(delta_table_path)   
except AnalysisException as e:
    logger.info(f"Delta table Does Not Exists, Creating a new Delta table with the name \'{delta_table_path}\'")
    output_delta_table = spark.createDataFrame(output_dataframe)
    output_delta_table.write.format("parquet").option("primaryKeyFields", 'mainControllerSerialDeviceId').save(delta_table_path)

logger.info(f"Successfully wrote data to delta table in the directory {delta_table_path}")
logger.info('\n')

In [21]:
print("Execution Successful! Logs are saved in output.log")

Execution Successful! Logs are saved in output.log
