In [1]:
#!pip install pyspark

In [3]:
import datetime
import itertools
import json
import logging
import math
import time

import io
import numpy as np
import pandas as pd
from prophet import Prophet
from prophet.serialize import model_to_json
from pyspark.accumulators import AccumulatorParam
from pyspark.sql import functions as F
from pyspark.sql.functions import (col, collect_list, current_date, date_sub,
                                   row_number, struct, to_timestamp)
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window

In [4]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [5]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, collect_list, current_date, date_sub,
                                   row_number, struct, to_timestamp)
from pyspark.sql import functions as F
import os
from datetime import timedelta
from typing import Optional
from pyspark.sql.types import (ArrayType, BooleanType, DoubleType, LongType,
                               StringType, StructField, StructType)

In [6]:
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

### Pipeline

In [66]:
import pyspark.sql.functions as F
from pyspark.sql.functions import avg as _avg, max as _max, min as _min
from pyspark.sql.functions import to_timestamp, current_timestamp, current_date, to_date
from pyspark.sql.functions import explode, col, floor, lit, lower, when

In [68]:
df = spark.read.option("multiline", "true").option("recursiveFileLookup", "true").json("powermeter.json")

In [70]:
distinct_df = df.select("report_id","device_id").distinct()
source_fields = df.columns
optional_fields = ["model", "location_state", "location_country", "processor_vendor",
                                    "server_generation", "location_id", "location_name",
                                    "location_city", "server_name"]
missing_option_fields = list(set(optional_fields)-set(source_fields))
for field in missing_option_fields:
                    df = df.withColumn(field, lit(None))

In [72]:
'inventory_data' in df.columns

True

In [74]:
df = df.select("report_id","status","report_type","error_reason","device_id", "application_customer_id", "platform_customer_id", "model", "location_state",
            "location_country", "processor_vendor", "server_generation", "location_id", "location_name",
            "location_city", "server_name", col("data.Id").alias("metric_id"),
            explode("data.PowerDetail").alias("MetricValues"), 
            F.to_json(F.col("inventory_data.cpu_inventory")).alias("cpu_inventory"), 
            F.to_json(F.col("inventory_data.memory_inventory")).alias("memory_inventory"),
            col("inventory_data.PCIe_devices_count").alias("pcie_devices_count"),
            col("inventory_data.socket_count").alias("socket_count"))

In [78]:
df = df.select("report_id","status","report_type","error_reason","device_id", "application_customer_id", "platform_customer_id", "model", "location_state",
            "location_country", "processor_vendor", "server_generation", "location_id", "location_name",
            "location_city", "server_name", "metric_id", "cpu_inventory", "memory_inventory", "pcie_devices_count", "socket_count",
            col("MetricValues.Average").alias("MetricValue"), 
            to_timestamp(col("MetricValues.Time")).cast("long").alias("MetricTimestamp"))

In [82]:
df = df.filter(df["MetricTimestamp"].isNotNull())
df = df.filter(df["MetricValue"].isNotNull())
df = df.select("report_id","status","report_type","error_reason","device_id", "application_customer_id", "platform_customer_id", "MetricValue", "model",
                            "location_state", "location_country", "processor_vendor", "server_generation", "location_id",
                            "location_name", "location_city", "server_name", "metric_id", "cpu_inventory", "memory_inventory", "pcie_devices_count", "socket_count",
                            col("device_id").alias("deviceid"),col("application_customer_id").alias("acid"),col("platform_customer_id").alias("pcid"),
                            col("MetricValue").alias("avg_metric_value"),col("MetricValue").alias("max_metric_value"),col("MetricValue").alias("min_metric_value"),
                            to_date(to_timestamp(col("MetricTimestamp"))).alias("metric_time"),
                            to_timestamp(floor(col("MetricTimestamp") / 3600) * 3600).alias("datetime"),
                            to_timestamp(floor(col("MetricTimestamp") / 3600) * 3600 + 3600).alias("timeRangeEnd")).withColumn("date", current_date()).withColumn("Insertiontime", current_timestamp())
df_report_count = df.groupby("platform_customer_id").count()
df = df.dropDuplicates(["application_customer_id","device_id","datetime","timeRangeEnd"])
df_report_count = df.groupby("platform_customer_id").count()

In [84]:
df.head()

Row(report_id='', status=True, report_type='OPTED_IN', error_reason='', device_id='001587-E22+8899001587122689', application_customer_id='51df092eec6f11ee892e6a5d057cf06d', platform_customer_id='427275fcddef11ebaeaea25b204e9436', MetricValue=176, model='ProLiant DL385 Gen10 Plus AMD Analytics', location_state='Karnataka', location_country='India', processor_vendor='AMD EPYC 7262 8-Core Processor                 ', server_generation='GEN_10', location_id='cdf733b4-327a-475a-b99a-dc1f0284a2b8', location_name='Bangalore-test-5', location_city='Bangalore', server_name='8899001587122689', metric_id='slowpowermeter', cpu_inventory='[]', memory_inventory='[]', pcie_devices_count=0, socket_count=0, deviceid='001587-E22+8899001587122689', acid='51df092eec6f11ee892e6a5d057cf06d', pcid='427275fcddef11ebaeaea25b204e9436', avg_metric_value=176, max_metric_value=176, min_metric_value=176, metric_time=datetime.date(2025, 5, 9), datetime=datetime.datetime(2025, 5, 9, 8, 30), timeRangeEnd=datetime.date

### Forecast

In [436]:
df = spark.read.option("header", "true").csv(["csv_dir\server0.csv"])\
.filter((col("avg_metric_value").isNotNull()) & (col("avg_metric_value") > 0))

In [438]:
df.cache()

DataFrame[Unnamed: 0: string, datetime: string, avg_metric_value: string, device_id: string, application_customer_id: string, platform_customer_id: string, metric_id: string, model: string, server_name: string, server_generation: string, processor_vendor: string, location_city: string, location_state: string, location_country: string, energy_cost_factor: string, co2_factor: string]

In [409]:
df.count()

11214

In [440]:
df.head(5)

[Row(Unnamed: 0='6768', datetime='2024-12-05 00:00:00', avg_metric_value='161.2980769230769', device_id='177298-X97+8899177298959715', application_customer_id='aci1', platform_customer_id='pci1', metric_id='SlowPowerMeter', model='ProLiant DL360 Gen10', server_name='stt-hst-esxi09', server_generation='GEN_10', processor_vendor='Intel(R) Xeon(R) Silver 4116 CPU @ 2.50GHz', location_city='Fort Collins', location_state='Colorado', location_country='United States', energy_cost_factor='0.109', co2_factor='0.46184473'),
 Row(Unnamed: 0='6769', datetime='2024-12-05 01:00:00', avg_metric_value='161.01482371794873', device_id='177298-X97+8899177298959715', application_customer_id='aci1', platform_customer_id='pci1', metric_id='SlowPowerMeter', model='ProLiant DL360 Gen10', server_name='stt-hst-esxi09', server_generation='GEN_10', processor_vendor='Intel(R) Xeon(R) Silver 4116 CPU @ 2.50GHz', location_city='Fort Collins', location_state='Colorado', location_country='United States', energy_cost_f

In [442]:
required_record_count = int(24 * 30 * 0.80)

In [444]:
device_count_df = df.groupBy(
    "application_customer_id", "device_id"
).agg(F.count("*").alias("device_count"))

In [446]:
filtered_device_ids = device_count_df.filter(
    F.col("device_count") >= required_record_count
).select("application_customer_id", "device_id")

In [449]:
joined_df = df.join(
    F.broadcast(filtered_device_ids),
    on=["application_customer_id", "device_id"],
)

In [451]:
joined_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [application_customer_id#10821, device_id#10820, Unnamed: 0#10817, datetime#10818, avg_metric_value#10819, platform_customer_id#10822, metric_id#10823, model#10824, server_name#10825, server_generation#10826, processor_vendor#10827, location_city#10828, location_state#10829, location_country#10830, energy_cost_factor#10831, co2_factor#10832]
   +- BroadcastHashJoin [application_customer_id#10821, device_id#10820], [application_customer_id#11293, device_id#11292], Inner, BuildRight, false
      :- Filter (isnotnull(application_customer_id#10821) AND isnotnull(device_id#10820))
      :  +- InMemoryTableScan [Unnamed: 0#10817, datetime#10818, avg_metric_value#10819, device_id#10820, application_customer_id#10821, platform_customer_id#10822, metric_id#10823, model#10824, server_name#10825, server_generation#10826, processor_vendor#10827, location_city#10828, location_state#10829, location_country#10830, energy_cost_factor#1

In [453]:
joined_df.show(5, False)

+-----------------------+---------------------------+----------+-------------------+------------------+--------------------+--------------+--------------------+--------------+-----------------+------------------------------------------+-------------+--------------+----------------+------------------+----------+
|application_customer_id|device_id                  |Unnamed: 0|datetime           |avg_metric_value  |platform_customer_id|metric_id     |model               |server_name   |server_generation|processor_vendor                          |location_city|location_state|location_country|energy_cost_factor|co2_factor|
+-----------------------+---------------------------+----------+-------------------+------------------+--------------------+--------------+--------------------+--------------+-----------------+------------------------------------------+-------------+--------------+----------------+------------------+----------+
|aci1                   |177298-X97+8899177298959715|6768    

In [455]:
joined_df.rdd.isEmpty()

False

In [457]:
num_device_id = joined_df.agg(F.countDistinct("device_id")).first()[0]

In [459]:
df_analytics = joined_df.withColumn(
    "y", df["avg_metric_value"].cast(DoubleType())
).drop("avg_metric_value")

In [461]:
df_analytics = df_analytics.withColumnRenamed("datetime", "ds").dropna(
    subset=["ds", "y"]
)

In [463]:
num_partitions = max(1, num_device_id // 2)

In [465]:
df_analytics = df_analytics.repartition(num_partitions, "device_id")

In [468]:
df_analytics.cache()

DataFrame[application_customer_id: string, device_id: string, Unnamed: 0: string, ds: string, platform_customer_id: string, metric_id: string, model: string, server_name: string, server_generation: string, processor_vendor: string, location_city: string, location_state: string, location_country: string, energy_cost_factor: string, co2_factor: string, y: double]

In [470]:
df_analytics.show(5)

+-----------------------+--------------------+----------+-------------------+--------------------+--------------+--------------------+--------------+-----------------+--------------------+-------------+--------------+----------------+------------------+----------+------------------+
|application_customer_id|           device_id|Unnamed: 0|                 ds|platform_customer_id|     metric_id|               model|   server_name|server_generation|    processor_vendor|location_city|location_state|location_country|energy_cost_factor|co2_factor|                 y|
+-----------------------+--------------------+----------+-------------------+--------------------+--------------+--------------------+--------------+-----------------+--------------------+-------------+--------------+----------------+------------------+----------+------------------+
|                   aci1|177298-X97+889917...|      6768|2024-12-05 00:00:00|                pci1|SlowPowerMeter|ProLiant DL360 Gen10|stt-hst-esxi09

In [472]:
df.unpersist(blocking=True)

DataFrame[Unnamed: 0: string, datetime: string, avg_metric_value: string, device_id: string, application_customer_id: string, platform_customer_id: string, metric_id: string, model: string, server_name: string, server_generation: string, processor_vendor: string, location_city: string, location_state: string, location_country: string, energy_cost_factor: string, co2_factor: string]

In [474]:
timestamp_format = "%Y-%m-%d %H:%M:%S"

In [476]:
df = df_analytics.withColumn(
    "timestamp",
    to_timestamp(col("ds").cast("timestamp"), timestamp_format),
)

In [478]:
window_spec = Window.partitionBy("device_id", "application_customer_id").orderBy(
    col("timestamp").desc()
)

In [480]:
df_with_rownum = df.withColumn(
    "row_num", row_number().over(window_spec)
)

In [482]:
server_attributes_df = df_with_rownum.filter(col("row_num") == 1).drop(
    "row_num"
)

In [484]:
server_attributes_dict = {}
count = server_attributes_df.count()

In [486]:
server_attributes_df.show(5, truncate=False)

+-----------------------+---------------------------+----------+-------------------+--------------------+--------------+--------------------+--------------+-----------------+------------------------------------------+-------------+--------------+----------------+------------------+----------+------------------+-------------------+
|application_customer_id|device_id                  |Unnamed: 0|ds                 |platform_customer_id|metric_id     |model               |server_name   |server_generation|processor_vendor                          |location_city|location_state|location_country|energy_cost_factor|co2_factor|y                 |timestamp          |
+-----------------------+---------------------------+----------+-------------------+--------------------+--------------+--------------------+--------------+-----------------+------------------------------------------+-------------+--------------+----------------+------------------+----------+------------------+-------------------+
|

In [488]:
server_attributes_dict = {
    f"{row['device_id']}_{row['application_customer_id']}": {
        "model": row["model"] if "model" in row else None,
        "server_name": row["server_name"] if "server_name" in row else None,
        "server_generation": row["server_generation"] if "server_generation" in row else None,
        "processor_vendor": row["processor_vendor"] if "processor_vendor" in row else None,
        "location_display_name": f"{row['location_city'] if 'location_city' in row else ''}, "
                                f"{row['location_state'] if 'location_state' in row else ''}, "
                                f"{row['location_country'] if 'location_country' in row else ''}".strip(', '),
        "energy_cost_factor": row["energy_cost_factor"] if "energy_cost_factor" in row else None,
        "co2_factor": row["co2_factor"] if "co2_factor" in row else None,
    }
    for row in server_attributes_df.collect()}

In [490]:
df = df.groupBy(
    "device_id",
    "platform_customer_id",
    "application_customer_id",
)

In [492]:
df = df.agg(collect_list(struct("ds", "y")).alias("data"))

In [494]:
df.show(5)

+--------------------+--------------------+-----------------------+--------------------+
|           device_id|platform_customer_id|application_customer_id|                data|
+--------------------+--------------------+-----------------------+--------------------+
|177298-X97+889917...|                pci1|                   aci1|[{2024-12-05 00:0...|
+--------------------+--------------------+-----------------------+--------------------+



In [496]:
def cached(df_analytics):
    return df_analytics

In [498]:
new_df = cached(df_analytics)

In [500]:
new_df.unpersist(blocking=True)

DataFrame[application_customer_id: string, device_id: string, Unnamed: 0: string, ds: string, platform_customer_id: string, metric_id: string, model: string, server_name: string, server_generation: string, processor_vendor: string, location_city: string, location_state: string, location_country: string, energy_cost_factor: string, co2_factor: string, y: double]

In [502]:
def transform_data(row):
    data = row["data"]
    device_id = row["device_id"]
    application_customer_id = row["application_customer_id"]
    platform_customer_id = row["platform_customer_id"]

    # Transform [pyspark.sql.Dataframe.Row] -> [dict]
    data_dicts = []
    for d in data:
        data_dicts.append(d.asDict())

    # Convert into pandas dataframe for fbprophet
    data = pd.DataFrame(data_dicts)
    data["ds"] = pd.to_datetime(data["ds"])
    data = data[data['y'] > 0]

    resampled_data = pd.DataFrame()
    if not data.empty:
        # Define the resample frequency
        resample_frequency = "4H"

        # Resample data using the defined frequency (aggregating using average)
        resampled_data = data.resample(resample_frequency, on="ds").mean().reset_index()

    return {
        "device_id": device_id,
        "platform_customer_id": platform_customer_id,
        "application_customer_id": application_customer_id,
        "data": resampled_data,
    }

In [504]:
def expand_predictions(d, server_attributes_dict):

    result = []

    device_id, platform_customer_id, application_customer_id, data = d

    result.append(
        (
            device_id,
            application_customer_id,
            platform_customer_id,
        )
    )
    return result

In [506]:
power_forecast_schema = StructType([
        StructField("device_id", StringType(), True),
        StructField("application_customer_id", StringType(), True),
        StructField("platform_customer_id", StringType(), True)])

In [508]:
result_df = df.rdd.map(lambda r: transform_data(r)).flatMap(lambda d: expand_predictions(d, server_attributes_dict))

In [510]:
result_df = spark.createDataFrame(result_df, power_forecast_schema)

In [512]:
result_df.cache()

DataFrame[device_id: string, application_customer_id: string, platform_customer_id: string]

In [514]:
result_df.count()

1

In [516]:
spark.stop()

11214