<!DOCTYPE html>
<html>

<body>
    <h2><span style="color: lightblue;">Influx v3</span></h2>
    <h3><strong>Desc:</strong></h3>
    <ul>
        <li>Write metrics to Influx Database.</li>
        <li>Read Metrics from a bucket.</li>
    </ul>
    <h3>Comments:</h3>
    <ul>
        <li>This notebook is still in <strong>DRAFT</strong> mode.</li>
        <li>This notebook was developed for Influx cloud v3. Do NOT use it with v2</li>
    </ul>
</body>
</html>

## Pre Req
1. Open an Iflux DB cloud account. Can be the free one. https://www.influxdata.com/get-influxdb/    
2. Create a new Bucket. For ex. ```Tables_bucket```.   
3. To learn about the InfluxDB terminology, rad here: https://docs.influxdata.com/influxdb/v2/get-started/ 
   
The rest of the flow follows the built-in samples
1. Open the "Load Data" option of "Client" 
2. Select "Python"
3. Follow the instructions. Step "install dependencies" is pip install influxdb3-python, pip install pandas

## Get Token. 
Without a token, the client code can't call the API. To get a Token, log in to the cloud console, Load Data, API Tokens, Generate API Token. 

In [None]:
%%bash
export INFLUXDB_TOKEN=XE9AyZ-3y-HJNyupKWiLgzVo5JMew-Y31Vq7gbakekdP66wIkBslEdnyrCc-vQ0t9MGFj449z0LvFhepVOwFfw==

## Initialize Client
Copy the code. I named the organization in Influx "Dev"

Notice. The original code, proivded by Influx returns an error: ```[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:997)``` . To solve it, add the coomand ``` ssl_ca_cert=certifi.where()```. Resource: # https://stackoverflow.com/questions/69401104/influxdb-2-0-certificate-verify-failed-certificate-has-expired-ssl-c1129
   

In [None]:
import os, time
from influxdb_client_3 import InfluxDBClient3, Point
import certifi # we need it to support their certification problems. 

# Read from the OS or pass the parameter directly. 
token = os.environ.get("INFLUXDB_TOKEN")
token = "XE9AyZ-3y-HJNyupKWiLgzVo5JMew-Y31Vq7gbakekdP66wIkBslEdnyrCc-vQ0t9MGFj449z0LvFhepVOwFfw=="
print (token)
org = "Dev"
host = "https://us-east-1-1.aws.cloud2.influxdata.com"

client = InfluxDBClient3(host=host, token=token, org=org, ssl_ca_cert=certifi.where())

## Copy (Insert) Data  
The object "Bucket" in the UI called "database" in python.
The measurement called "Census". Notic the ```Point``` object uses it.  



In this data example, we have some important concepts:
- **measurement**: Primary filter for the thing you are measuring. Since we are measuring the sample census of insects, our measurement is "census".
- **tag**: Key-value pair to store metadata about your fields. We are storing the "location" of where each census is taken. Tags form part of your primary key.
- **field**:	Key-value pair that stores the actual data you are measuring.	We are storing the insect "species" and "count" as the key-value pair. Fields are not indexed and can be stored as integers, floats, strings, or booleans.

In [None]:
database="Tables_Bucket"

data = {
  "point1": {
    "location": "Klamath",
    "species": "bees",
    "count": 25,
  },
  "point2": {
    "location": "Portland",
    "species": "ants",
    "count": 32,
  },
  "point3": {
    "location": "Klamath",
    "species": "bees",
    "count": 28,
  },
  "point4": {
    "location": "Portland",
    "species": "ants",
    "count": 36,
  },
  "point5": {
    "location": "Klamath",
    "species": "bees",
    "count": 27,
  },
  "point6": {
    "location": "Portland",
    "species": "ants",
    "count": 43,
  },
}

for key in data:
  point = (
    Point("census")
    .tag("location", data[key]["location"])
    .field(data[key]["species"], data[key]["count"])
  )
  client.write(database=database, record=point)
  time.sleep(1) # separate points by 1 second

print("Complete. Return to the InfluxDB UI.")


## Insert data from a CSV
The CLI Code uses a CSV stored in S3: https://influx-testdata.s3.amazonaws.com/air-sensor-data-annotated.csv

In [None]:
%%bash
# influx write --bucket Tables_Bucket --url https://influx-testdata.s3.amazonaws.com/air-sensor-data-annotated.csv

## Execute a Simple Query - SQL 
v3 supports SQL. v2 couldn't.

In [None]:
query = """SELECT *
FROM 'census'
WHERE time >= now() - interval '168 hours'
AND ('bees' IS NOT NULL OR 'ants' IS NOT NULL)"""

# Execute the query
table = client.query(query=query, database="Tables_Bucket", language='sql') 

# Convert to dataframe
df = table.to_pandas().sort_values(by="time")
column_names = df.columns
print(column_names)
print(df)




## Advance Query - SQL 
Sep 17 - doesn't work. Not sure why as it is copy paste from the demo. 
I suspect the fact I had to use "sql" as a language to make it work, and not "influxql" is the root cause. The documentation says using ```import influxdb_client_3 as InfluxDBClient3``` with influxql but it doesn't work.    

Trying to implement ```GROUP BY``` using: https://docs.influxdata.com/influxdb/v1/query_language/explore-data/#the-group-by-clause 

In [None]:
## Execute Aggregate Queries. The first one works
query = """
SELECT  location, max(time), avg(census.ants)
FROM "census"
WHERE time >= now() - interval '1 hour'
AND (ants IS NOT NULL)
GROUP BY location
"""


# Execute the query
table = client.query(query=query, database="Tables_Bucket", language='sql') 

# Convert to dataframe
df = table.to_pandas()#.sort_values(by='time')
print(df)

## Query the data - Group by time ranges
You can always use a good olf Flux to run a GROUP BY query. Group the data every 5 min. 
It the query doesn't return any data, that means that no data was inserted to this Influx Bucket. See one of the cells above how to insert data.   

- Example 1 - running using the Influx CLI ( ```brew install influxdb-cli```). Apparently it still works with Flux. 
- Exampel 2 - Running using Python. It can't use Flux anymore. Only InfluQL ( https://docs.influxdata.com/influxdb/v1/query_language/, supported languages now are only SQL or InfluxQL: https://docs.influxdata.com/influxdb/cloud-dedicated/reference/client-libraries/v3/python/#functions)

In [None]:
%%bash
influx query \
'from(bucket: "Tables_Bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "airSensors" and r._field == "humidity")
  |> aggregateWindow(every: 15m, fn: max)'



In [None]:
# Resource: Query Data with InfluxQL: https://docs.influxdata.com/influxdb/cloud/query-data/influxql/
import pandas as pd
import plotly.express as px
from influxdb_client import InfluxDBClient, Point
from datetime import timedelta
import plotly.figure_factory as ff

# Define your InfluxDB connection details
#url = "http://localhost:8086"
#token = "your_influxdb_token"
#org = "your_organization"
bucket = "Tables_Bucket"

# Create an InfluxDB client instance
#client = InfluxDBClient(url=url, token=token, org=org)

# SQL Query (NOT Flux)
query = """
SELECT max(humidity)
FROM airSensors
WHERE time >= '2023-09-25T00:00:00Z'
GROUP BY time(60m)
"""

token = os.environ.get("INFLUXDB_TOKEN")
token = "XE9AyZ-3y-HJNyupKWiLgzVo5JMew-Y31Vq7gbakekdP66wIkBslEdnyrCc-vQ0t9MGFj449z0LvFhepVOwFfw=="
org = "Dev"
host = "https://us-east-1-1.aws.cloud2.influxdata.com"

client = InfluxDBClient3(host=host, token=token, org=org, ssl_ca_cert=certifi.where())
# You can bring only the schema to help troubleshooting
# schema = client.query(query=query, database="Tables_Bucket", mode="schema", language="influxql")
# print(schema)

table = client.query(query=query, database="Tables_Bucket", mode ="all", language="influxql")
dataframe = table.to_pandas() # This one automatically eliminitaes the NULL values. Not good. 
print (table)

# Create a dictionary from the data
data_dict = {
    "Measurement": table[0],
    "Time": table[1],
    "Max": table[2]
}

# Create a Pandas DataFrame
df = pd.DataFrame(data_dict)

print(df)

# Create a line plot using Plotly Express
fig = px.line(df, x="Time", y="Max", title="Max Values Over Time", labels={"Max": "Max Value"})

# Show the plot
fig.show()

# Close the client connection
client.close()


## Testing on V3 - real world data

In [None]:
# Resource: Query Data with InfluxQL: https://docs.influxdata.com/influxdb/cloud/query-data/influxql/
import pandas as pd
import plotly.express as px
from influxdb_client import InfluxDBClient, Point
from datetime import timedelta

# token = "ylThfbJSW4MDfbcEIyNQsS2vnHOBjz-Uanc0KgHAZIcpBSRPc1WaPRwVKmdNzPj8H-HNQyEaN32dqrC10zjVWg=="
token = "PjObG70ZggAu78U5hN9awHq5pOk6GOtAsu7Fp4JROzdHmllRCMrlQh0r9owylgKR0A_Ki7-EYgzz03TNad0tqw=="
org = "Metis-v3"
host = "https://eu-central-1-1.aws.cloud2.influxdata.com"
bucket = "test"

client = InfluxDBClient3(host=host, token=token, org=org, ssl_ca_cert=certifi.where())
# You can bring only the schema to help troubleshooting
# schema = client.query(query=query, database="Tables_Bucket", mode="schema", language="influxql")
# print(schema)

# InfluxQL Query (NOT Flux)
# The last 14 days of a specific query ID, group by hour. 

### THIS QUERY DOESN'T WORK - the query should show the hourly diff of every hour. It shows 0.
query = """
SELECT time, 
    max(calls)
    --DERIVATIVE(max(calls))
FROM QUERY_DETAILS
WHERE time >= '2023-09-14T00:00:00Z'
AND apiKey = 'mj1Vde9QC664608cJ24CV3Zf2Y9tdgzt20P8quOm'
AND host = 'database-2.cofhrj7zmyn4.eu-central-1.rds.amazonaws.com'
AND db = 'platform-v2'
AND query_id = '-1004574566510211833'
GROUP BY time(60m)
"""
table = client.query(query=query, database=bucket, mode ="all", language="influxql")
dataframe = table.to_pandas() # This one automatically eliminitaes the NULL values. Not good. 
# print (table)

# Create a dictionary from the data
data_dict = {
    "Measurement": table[0],
    "Time": table[1],
    "Max": table[2]
}
# Create a Pandas DataFrame
df = pd.DataFrame(data_dict)
print(df)


 ############################################
query = """
SELECT
  SUM("calls") AS sum_calls
FROM
  "test"."autogen"."QUERY_DETAILS"
WHERE
  time >= now() - 2h
  AND apiKey = 'mj1Vde9QC664608cJ24CV3Zf2Y9tdgzt20P8quOm'
  AND "host" = 'database-2.cofhrj7zmyn4.eu-central-1.rds.amazonaws.com'
  AND "db" = 'platform-v2'
GROUP BY
  time(1h), apiKey, "host", "db", "query_id"

"""

table = client.query(query=query, database=bucket, mode ="all", language="influxql")
dataframe = table.to_pandas() # This one automatically eliminitaes the NULL values. Not good. 
print

data_dict = {
    "Measurement": table[0],
    "Time": table[1],
    "apiKey":table[2],
    "Max": table[6]
}
# Create a Pandas DataFrame
df = pd.DataFrame(data_dict)
df

plot_table = ff.create_table(df)

# Display the table in Jupyter Notebook
plot_table.show()

# Close the client connection
client.close()


## Testing Line Protocol

In [None]:
%%bash
influx write -b Tables_Bucket -o Dev -f /Users/itaybraun/Documents/GitHub/db-observability-toolkit/Research_Notebooks/influx_line_protocol_1.txt
  

## Testing CSV

In [None]:
%%bash
influx write --bucket Tables_Bucket --f /Users/itaybraun/Documents/GitHub/db-observability-toolkit/Research_Notebooks/influx_csv.csv

Testing CSV, now with more data

In [None]:
import csv
from datetime import datetime, timedelta

# Define the parameters
host = "host1"
database = "db1"
query_ids = ["qry1", "qry2", "qry3", "qry4", "qry5", "qry6", "qry7", "qry8", "qry9", "qry10"]
start_time = datetime(2023, 9, 13, 0, 0, 0)  # Sep 13, 2023, 00:00:00
calls_increment = 1000
time_difference = timedelta(minutes=5)

# Generate and write the CSV data
with open('queries.csv', 'w', newline='') as csvfile:
    fieldnames = ['#datatype measurement', 'tag', 'tag', 'tag', 'double', 'double', 'double', 'dateTime:RFC3339']
    writer = csv.writer(csvfile)
    
    # Write the first row of metadata
    writer.writerow(fieldnames)

    # Write the second row with field names
    writer.writerow(['queries', 'host', 'db', 'query_id', 'calls', 'total_exec_time', 'rows_read', 'time'])

    # Generate 400 rows for each of the 10 query IDs
    for query_id in query_ids:
        for i in range(4000):
            timestamp = start_time + i * time_difference
            calls = i * calls_increment

            # Create a list representing a row of data
            row = [
                'queries',
                host,
                database,
                query_id,
                calls,
                calls * 8,  # Assuming a constant multiplier for total_exec_time
                calls * 1000,  # Assuming a constant multiplier for rows_read
                timestamp.isoformat() + 'Z'
            ]

            # Write the row to the CSV file
            writer.writerow(row)

print("Data generation complete. Output written to queries.csv.")


In [None]:
# Verify the file was created succesfully. and its size. 
import os

file_path = 'queries.csv'

# Function to get the number of rows and file size
def get_file_stats(file_path):
    with open(file_path, 'r') as file:
        # Read the number of rows (excluding header rows)
        num_rows = sum(1 for row in file) - 2  # Subtracting 2 for the header rows

    # Get file size in kilobytes
    file_size_kb = os.path.getsize(file_path) / 1024

    return num_rows, file_size_kb

# Get file statistics
num_rows, file_size_kb = get_file_stats(file_path)

# Format the number of rows with commas
formatted_num_rows = "{:,}".format(num_rows)

# Display the results
print(f"Number of rows in the file: {formatted_num_rows}")
print(f"File size: {file_size_kb:.2f} KB")


In [None]:
%%bash
# influx write -b Tables_Bucket -o Dev -f /Users/itaybraun/Documents/GitHub/db-observability-toolkit/Research_Notebooks/queries.csv

Read using InfluxQL

In [None]:
%%bash
influx query \
'from(bucket: "Tables_Bucket")
  |> range(start: -14d, stop: -12d)
  |> filter(fn: (r) => r._measurement == "m" and r._field == "calls" and r.query_id == "qry1")
  |> aggregateWindow(every: 60m, fn: max)
  |> difference(nonNegative: true)
  |> drop(columns: ["_start", "_stop", "_measurement"])'

In [None]:
# Define your InfluxDB connection details
#url = "http://localhost:8086"
#token = "your_influxdb_token"
#org = "your_organization"
bucket = "Tables_Bucket"

# SQL Query (NOT Flux)
query = """
SELECT query_id, max(calls)
FROM "queries"
WHERE time >= '2023-09-25T00:00:00Z'
GROUP by query_id, time(60m)
"""

## Query_2 Doesn't work
query_2 = """
SELECT derivative(max("calls"), 5m) AS "difference_calls"
FROM "queries"
WHERE time >= -14d AND time < -12d AND query_id = 'qry1'
GROUP BY time(60m) fill(null)
"""

token = os.environ.get("INFLUXDB_TOKEN")
token = "XE9AyZ-3y-HJNyupKWiLgzVo5JMew-Y31Vq7gbakekdP66wIkBslEdnyrCc-vQ0t9MGFj449z0LvFhepVOwFfw=="
org = "Dev"
host = "https://us-east-1-1.aws.cloud2.influxdata.com"

client = InfluxDBClient3(host=host, token=token, org=org, ssl_ca_cert=certifi.where())
# You can bring only the schema to help troubleshooting
# schema = client.query(query=query, database="Tables_Bucket", mode="schema", language="influxql")
# print(schema)

table = client.query(query=query_2, database="Tables_Bucket", mode ="all", language="influxql")
dataframe = table.to_pandas() # This one automatically eliminitaes the NULL values. Not good. 
print (table)


# Close the client connection
client.close()


## Delete a bucket


In [None]:
import requests

# InfluxDB API endpoint
base_url = "http://localhost:8086"
org = "your_organization"  # Replace with your organization name
bucket = "your_bucket"      # Replace with the name of the bucket you want to delete

# Authentication token (if required)
token = "your_authentication_token"  # Replace with your authentication token, if needed

# Construct the URL for deleting the bucket
url = f"{base_url}/api/v2/buckets/{org}/{bucket}"

# Headers for the request (include the authentication token if required)
headers = {
    "Authorization": f"Token {token}" if token else "",
}

# Send the DELETE request to delete the bucket
response = requests.delete(url, headers=headers)

# Check the response status code
if response.status_code == 204:
    print(f"Bucket '{bucket}' deleted successfully.")
else:
    print(f"Failed to delete bucket '{bucket}'. Status code: {response.status_code}")
    print(response.text)
