Jupyter Notebook to achive capacity forecast for Ceph Storage Pools.
To make this work you have to:

1. Enable Telegraf Telemetry in Ceph:
<pre>
ceph mgr module enable telegraf
ceph telegraf config-set address udp://:8094
ceph telegraf config-set interval 10
</pre>

Additionally you have to Configure your telegraf to forward those broadcasts to your InfluxDB Instance:

Create a file like /etc/telegraf/telegraf.d/ceph.conf with the following content:

<pre>
[[inputs.socket_listener]] 
  service_address = "udp://:8094" 
  data_format = "influx"
</pre>


2. Adjust the Variables in this script to match your Environment: Adjust the fsid (UUID for the Pool you want to Monitor) and the InfluxDB Credentials

3. After tunning this Notebook you should have a new measurement called ceph_cluster_stats_fc for the next 365 days, based on the Data of the past 365 days. You can now easily create a Grafana Dashboard from it.

This is just a real-world example on a Ceph Storage Pool. Storage Usage is usually subject to a strong seasonality and therefor a pretty good showcase for timeseries forecasting. Daily and weekly Snapshots + snapshot trimming usually produces a "heartbeat" with a high seasonality. It should easily be able to adapt to ZFS or any other Type of Storage.


This notebook uses the <a href="https://v2.docs.influxdata.com/v2.0/reference/client-libraries/">Python-InfluxDB Client Library</a> and Facebooks Prophet to make forecasts. The basic approach is derived from the <a href="https://github.com/facebook/prophet/blob/master/notebooks/quick_start.ipynb">quick-start example notebook </a> in the <a href="https://github.com/facebook/prophet">prophet repo</a>.

4. You migh consider deleting your old forecast when periodically making a new one. 

```
influx delete -o <ORG> --bucket <BUCKET> --predicate '_measurement="ceph_cluster_stats_v15_forcast"' --start '1970-01-01T00:00:00Z' --stop  $(date +"%Y-%m-%dT%H:%M:%SZ")
```

This is not a complete code, more a proof-of-concept.

In [None]:
import pandas as pd
import time
from datetime import datetime
from prophet import Prophet

In [None]:
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from credentials import *

# Which measurement to write the forecast to
dst_measurement_name = "ceph_cluster_stats_v15_forcast"

# Ceph Pool fsid
fsid = "acaaae68-7fae-4eab-a389-c729edf8b37e"

# How many days from the past to train?
days = 365

# Influx Database Credentials
url = influx_url
token = influx_token
bucket = influx_bucket
org = influx_org
client = InfluxDBClient(url=url, token=token, org=org)
query_api = client.query_api()
write_api = client.write_api(write_options=SYNCHRONOUS)

In [None]:
query = 'from(bucket: "' + bucket + '")' \
        '  |> range(start: -' + days + 'd)' \
        '  |> filter(fn: (r) => r._measurement == "ceph_cluster_stats_v15" and r.type_instance == "bytes_used" and r.fsid == "' + fsid + '")' \
        '  |> aggregateWindow(fn: mean, every: 1h, createEmpty: false)'



result = client.query_api().query(org=org, query=query)

In [None]:
raw = []
for table in result:
    for record in table.records:
        raw.append((record.get_value(), record.get_time()))

In [None]:
raw[0:5]

In [None]:
print()
print("=== influxdb query into dataframe ===")
print()
df=pd.DataFrame(raw, columns=['y','ds'], index=None)
df['ds'] = df['ds'].values.astype('<M8[D]')
df.head()

We fit the model by instantiating a new `Prophet` object.  Any settings to the forecasting procedure are passed into the constructor.  Then you call its `fit` method and pass in the historical dataframe. Fitting should take 1-5 seconds.

In [None]:
m = Prophet(weekly_seasonality=True, yearly_seasonality=True,changepoint_prior_scale=0.0001).fit(df)

Predictions are then made on a dataframe with a column `ds` containing the dates for which a prediction is to be made. You can get a suitable dataframe that extends into the future a specified number of days using the helper method `Prophet.make_future_dataframe`. By default it will also include the dates from the history, so we will see the model fit as well. 

In [None]:
future = m.make_future_dataframe(periods=365*24*3, freq="H")
future.tail()

The `predict` method will assign each row in `future` a predicted value which it names `yhat`.  If you pass in historical dates, it will provide an in-sample fit. The `forecast` object here is a new dataframe that includes a column `yhat` with the forecast, as well as columns for components and uncertainty intervals.

In [None]:
forecast = m.predict(future)
forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()

In [None]:
forecast['measurement'] = dst_measurement_name
forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper','measurement']].head()

You can plot the forecast by calling the `Prophet.plot` method and passing in your forecast dataframe.

In [None]:
from prophet.plot import add_changepoints_to_plot
fig1 = m.plot(forecast)
a = add_changepoints_to_plot(fig1.gca(),m,forecast)

In [None]:
fig2 = m.plot_components(forecast)


In [None]:
cp = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper','measurement']].copy()
lines = [str(cp["measurement"][d]) 
         + ",type=forecast" 
         + " " 
         + "yhat=" + str(cp["yhat"][d]) + ","
         + "yhat_lower=" + str(cp["yhat_lower"][d]) + ","
         + "yhat_upper=" + str(cp["yhat_upper"][d])
         + " " + str(int(time.mktime(cp['ds'][d].timetuple()))) + "000000000" for d in range(len(cp))]

In [None]:
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS

_write_client = client.write_api(write_options=WriteOptions(batch_size=1000, 
                                                            flush_interval=10000,
                                                            jitter_interval=2000,
                                                            retry_interval=5000))

_write_client.write(bucket, org, lines)

lines[0:10]


To close client:

In [None]:
_write_client.__del__()
client.__del__()