## Imports

In [1]:
import time

import numpy as np
from dotenv import load_dotenv

import logger_setup

import pandas as pd
from influxdb_client import InfluxDBClient
# Ciekawy statek:
# 215131000

import os
from influxdb_client import Point
from influxdb_client.client.write_api import SYNCHRONOUS

from csv_reader import ais_csv_to_df

## Functions

In [2]:
def create_point(row: pd.Series, measurement_name: str,
                 mmsi_fieldname="MMSI", vessel_name_fieldname="VesselName",
                 latitude_fieldname="LAT", longitude_fieldname="LON", time_fieldname="BaseDateTime"
                 ):
    t = "vessels_ais_31_12"
    point = (
        Point(measurement_name=measurement_name)
        .tag("mmsi", row[mmsi_fieldname])
        .tag("vessel_name", row[vessel_name_fieldname])
        .field("lat", row[latitude_fieldname])
        .field("lon", row[longitude_fieldname])
        .time(row[time_fieldname])
    )
    return point


def upload_df_to_influx_in_batches(df: pd.DataFrame, influx_client: InfluxDBClient, bucket_name: str,
                                   organization_id: str,
                                   batch_size: int = 100000,
                                   data_frame_tag_columns=["MMSI", "VesselName", "CallSign", "VesselType", "Status",
                                                           "Length", "Width", "Cargo", "TransceiverClass"]):
    logger.debug(f"Uploading to influxdb. Batch size: {batch_size}.")
    write_api = influx_client.write_api(write_options=SYNCHRONOUS)

    rows = df.shape[0]
    divisions = rows // batch_size + 1
    dfs = np.array_split(df, divisions)

    for i in range(divisions):
        logger.debug(f"Uploading division {i}/{divisions - 1}. Shape: {dfs[i].shape}. Processing...")
        write_api.write(bucket=bucket_name, org=organization_id,
                        record=dfs[i],
                        data_frame_measurement_name="vessels_ais_31_12",
                        data_frame_tag_columns=data_frame_tag_columns,
                        data_frame_timestamp_column="BaseDateTime",
                        )

### Setup

In [3]:
logger = logger_setup.setup_logging()
load_dotenv()
token = os.environ.get("API_INFLUX_KEY")
org = os.environ.get("INFLUX_ORG_ID")
url = "http://localhost:" + os.environ.get("INFLUX_PORT", "55000")

logger.debug(f"Token: {token}")
logger.debug(f"Organization id: {org}")
logger.info(f"Database endpoint: {url}")

2024-07-17 17:52:29,269 - DEBUG: Token: EJq62wZIuaOddyb4RZtujdg4Pv_o2lO5SNAdB5Dme5rK1bNkniAgMLnxLLugzT-epKiE4NVI71oMuZkfdj4ewg==
2024-07-17 17:52:29,271 - DEBUG: Organization id: bc3f6fcfff4173ac
2024-07-17 17:52:29,272 - INFO: Database endpoint: http://localhost:55000


## Working with data

### Setting spatial index

Influx uses s2 cells for this purpose.

In query this is `geo.shapeData()` function that does it. 

`s2_cell_id` has to be **saved as a tag** for other functions (such as `geo.filterRows()`) to work. 

In [15]:
import datetime

client = InfluxDBClient(url=url, token=token, org=org, timeout=60000)  # Set timeout to 60 seconds
query_api = client.query_api()


def generate_time_ranges_for_day(date: datetime.date, interval_minutes: float):
    current_start = datetime.datetime.combine(date, datetime.time.min)
    end_of_day = datetime.datetime.combine(date, datetime.time.max)
    while current_start < end_of_day:
        current_end = current_start + datetime.timedelta(minutes=interval_minutes)
        if current_end > end_of_day:
            current_end = end_of_day
        yield current_start.isoformat() + 'Z', current_end.isoformat() + 'Z'
        current_start = current_end


day = datetime.date(2020, 12, 31)
interval = 30

# Are these correct?
# raw_data_bucket = "temp_bucket_2"
# indexed_data_bucket = "shapedData_bucket2"
# indexed_data_bucket = "temp"

lat_field_name = "LAT"
lon_field_name = "LON"
level = 10

for start, end in generate_time_ranges_for_day(day, interval):
    print(f"From: {start}, to: {end}", end=" ")
    flux_query = f"""
import "experimental/geo"

from(bucket: "{raw_data_bucket}")
    |> range(start: {start}, stop: {end})
    |> filter(fn: (r) => r._measurement == "vessels_ais_31_12")
    |> filter(fn: (r) => r._field == "LAT" or r._field == "LON")
    |> geo.shapeData(latField: "{lat_field_name}", lonField: "{lon_field_name}", level: {level})
    |> to
        (bucket: "{indexed_data_bucket}", tagColumns: ["s2_cell_id", "MMSI"], fieldFn: (r) => ({{"lat": r.lat, "lon": r.lon}}))
    """

    # Execute the query
    result = query_api.query(flux_query)
    print("Finished.")
    break


From: 2020-12-31T00:00:00Z, to: 2020-12-31T00:30:00Z)


### Spatial query and spatiotemporal query

Well to be honest it's a spatiotemporal query, because influx requires you to specify time range for every query. This still takes full time into consideration.

In [16]:
client = InfluxDBClient(url=url, token=token, org=org, timeout=60000)  # Set timeout to 60 seconds
query_api = client.query_api()

# bucket = "shapedData_bucket2"
bucket = "temp"
start_date = "2020-12-31T00:00:00Z"
stop_date = "2020-12-31T00:59:59Z"
min_lat = 41.80
max_lat = 41.87
min_lon = -88.0
max_lon = -87.0
level = 10
strict = "true"
start_time = time.time()

query = f"""
import "experimental/geo"

region = {{
    minLat: {min_lat},
    maxLat: {max_lat},
    minLon: {min_lon},  
    maxLon: {max_lon},
}}

from(bucket: "{bucket}")
    |> range(start: {start_date}, stop: {stop_date})
    |> filter(fn: (r) => r._measurement == "vessels_ais_31_12")
    |> geo.filterRows(region: region, level: {level}, strict: {strict})
"""

tables = query_api.query_data_frame(query)

end_time = time.time()
logger.info(f"Query took {end_time - start_time} seconds without printing.")

logger.info("Closing database connection...")
client.close()


The result will not be shaped to optimal processing by pandas.DataFrame. Use the pivot() function by:

    
import "experimental/geo"

region = {
    minLat: 41.8,
    maxLat: 41.87,
    minLon: -88.0,  
    maxLon: -87.0,
}

from(bucket: "temp")
    |> range(start: 2020-12-31T00:00:00Z, stop: 2020-12-31T00:59:59Z)
    |> filter(fn: (r) => r._measurement == "vessels_ais_31_12")
    |> geo.filterRows(region: region, level: 10, strict: true)
 |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")



For more info see:
    - https://docs.influxdata.com/resources/videos/pivots-in-flux/
    - https://docs.influxdata.com/flux/latest/stdlib/universe/pivot/
    - https://docs.influxdata.com/flux/latest/stdlib/influxdata/influxdb/schema/fieldsascols/

2024-07-17 18:10:51,932 - INFO: Query took 6.4915361404418945 seconds without printing.
2024-07-17 18:10:51,932 - INFO: Closing database connection...


In [17]:
tables

Unnamed: 0,result,table,_start,_stop,_time,MMSI,_measurement,s2_cell_id,lat,lon
0,_result,0,2020-12-31 00:00:00+00:00,2020-12-31 00:59:59+00:00,2020-12-31 00:03:25+00:00,338866000,vessels_ais_31_12,8811d1,41.81003,-87.44458
1,_result,0,2020-12-31 00:00:00+00:00,2020-12-31 00:59:59+00:00,2020-12-31 00:04:36+00:00,338866000,vessels_ais_31_12,8811d1,41.80688,-87.44598
2,_result,0,2020-12-31 00:00:00+00:00,2020-12-31 00:59:59+00:00,2020-12-31 00:05:46+00:00,338866000,vessels_ais_31_12,8811d1,41.80374,-87.44744
3,_result,0,2020-12-31 00:00:00+00:00,2020-12-31 00:59:59+00:00,2020-12-31 00:06:56+00:00,338866000,vessels_ais_31_12,8811d1,41.80059,-87.44886
4,_result,1,2020-12-31 00:00:00+00:00,2020-12-31 00:59:59+00:00,2020-12-31 00:00:05+00:00,338866000,vessels_ais_31_12,8811d3,41.81906,-87.44051
5,_result,1,2020-12-31 00:00:00+00:00,2020-12-31 00:59:59+00:00,2020-12-31 00:01:06+00:00,338866000,vessels_ais_31_12,8811d3,41.81635,-87.44173
6,_result,1,2020-12-31 00:00:00+00:00,2020-12-31 00:59:59+00:00,2020-12-31 00:02:16+00:00,338866000,vessels_ais_31_12,8811d3,41.81318,-87.44316


### Time query

### Uploading data - slow

In [11]:
# Uploading data one by one - slow

# safety check - don't run unless you really want to
exit()

client: InfluxDBClient = InfluxDBClient(url=url, token=token, org=org)
bucket = "temp_bucket_2"

df = ais_csv_to_df("data/AIS_2020_12_31.csv")
df["VesselName"] = df["VesselName"].str.replace(" ", "\ ")
print("Creating points...")
df["Points"] = df.apply(create_point, axis=1, args=("vessels_ais_31_12",))
write_api = client.write_api(write_options=SYNCHRONOUS)
start_time = time.time()
print("Uploading points...")
for i, point in enumerate(df["Points"]):
    if i % 1000 == 0 and i != 0:
        print(
            f"Point {i}: {point}. Time elapsed: {time.time() - start_time}. Average time per point: {(time.time() - start_time) / i}")
    write_api.write(bucket=bucket, org=org, record=point)

logger.info("Closing database connection...")
client.close()

2024-07-08 08:30:26,167 - DEBUG: Loading data...


KeyboardInterrupt: 

### Uploading data - fast

In [None]:
# Uploading data do influx database in batches - fast

# safety check - don't run unless you really want to
exit()

client: InfluxDBClient = InfluxDBClient(url=url, token=token, org=org)
bucket = "temp_bucket_2"

df = ais_csv_to_df("data/AIS_2020_12_31.csv")
# df = df[["MMSI", "VesselName", "LAT", "LON", "BaseDateTime"]]
df["VesselName"] = df["VesselName"].str.replace(" ", "\ ")
df["CallSign"] = df["CallSign"].str.replace(" ", "\ ")
logger.debug(f"Dataframe shape: {df.shape}")

logger.debug("Beware! Executing The Command!")
start_time = time.time()
upload_df_to_influx_in_batches(df, client, bucket, org, 200000)
end_time = time.time()
logger.info(f"Upload time: {end_time - start_time}")

logger.info("Closing database connection...")
client.close()

## How to connect to the database

In [10]:
client: InfluxDBClient = InfluxDBClient(url=url, token=token, org=org)
bucket = "temp_bucket_2"

logger.info("Closing database connection...")
client.close()

2024-07-08 08:29:04,515 - INFO: Closing database connection...
