```bash
apt-get update
apt install default-jre

curl -O https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.34.0/sqlite-jdbc-3.34.0.jar
```

In [46]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .master("local")
    .appName("SQLite JDBC")
    .config(
        "spark.jars",
        "/workspaces/mirrorverse/sqlite-jdbc-3.34.0.jar")
    .config(
        "spark.jars",
        "target/spark-tensorflow-connector_2.11-1.10.0.jar"
    )
    .config(
        "spark.driver.extraClassPath",
        "/workspaces/mirrorverse/sqlite-jdbc-3.34.0.jar")
    .getOrCreate())

ps_conn = "jdbc:sqlite:/workspaces/mirrorverse/mirrorverse.db" #"sqlite:////workspaces/mirrorverse/mirrorverse.db"
pd_conn = "sqlite:////workspaces/mirrorverse/mirrorverse.db"

24/06/23 23:05:20 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [47]:
states_sql = '''
select 
    d.epoch,
    t.h3_level_4_key,
    d.depth,
    "t" || t.tag_key as fish_id,
    ROW_NUMBER() over () as _decision
from 
    tag_depths d
    inner join tag_tracks t 
        on d.tag_key = t.tag_key
        and d.date_key = t.date_key
where
    d.depth is not null
'''

elevation_sql = '''
select 
    h3_level_4_key,
    elevation
from
    elevation
'''


In [48]:
from scipy.stats import norm
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

def select_a_class(depth):
    """
    Inputs:
    - depth: float, the depth of the fish as recorded

    Outputs:
    - int, the selected depth class

    Selects a depth class based on the depth of the fish.

    It turns out that PSAT summary data bins the depth into
    intervals so the actual depth is not known. However
    given the recorded depth we can estimate the depth classes
    it could belong to and the likelihoods of each.
    """
    depth_classes = np.array([25, 50, 75, 100, 150, 200, 250, 300, 400, 500])
    sd = (
        depth * 0.08 / 1.96
    )  # ~two standard deviations gives our 95% confidence interval
    if sd == 0:
        division = np.zeros(len(depth_classes))
        division[0] = 1
    else:
        # we're going to assume the depth classes are sorted
        z = (depth_classes - depth) / sd
        division = norm.cdf(z)
        division[1:] = division[1:] - division[:-1]
    # if there aren't quite enough depth classes the
    # probabilities may not sum to 1, so we'll normalize
    division = division / division.sum()
    return float(np.random.choice(depth_classes, p=division))

select_a_class_udf = udf(select_a_class, DoubleType())

In [49]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import TimestampType

@pandas_udf(TimestampType())
def epoch_to_datetime(epoch):
    return pd.to_datetime(epoch, utc=True, unit="s")

In [50]:
from pyspark.sql.types import StructType, StructField, FloatType
import h3


schema = StructType([
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True)
])

def h3_to_geo(h3_key):
    h3_key = hex(h3_key)[2:]
    lat, lon = h3.h3_to_geo(h3_key)
    return lat, lon

h3_to_geo_udf = udf(h3_to_geo, schema)

In [51]:
from suntimes import SunTimes
from pyspark.sql.types import IntegerType

def get_sunrise(lat, lon, date):
    """
    Inputs:
    - lat: float, latitude
    - lon: float, longitude
    - date: str, date

    Outputs:
    - int, hour of sunrise
    """
    return SunTimes(longitude=lon, latitude=lat, altitude=0).risewhere(date, "UTC").hour


def get_sunset(lat, lon, date):
    """
    Inputs:
    - lat: float, latitude
    - lon: float, longitude
    - date: str, date

    Outputs:
    - int, hour of sunset
    """
    return SunTimes(longitude=lon, latitude=lat, altitude=0).setwhere(date, "UTC").hour

get_sunrise_udf = udf(get_sunrise, IntegerType())
get_sunset_udf = udf(get_sunset, IntegerType())

In [52]:
@pandas_udf(FloatType())
def get_period_progress(date, sunrise, sunset, daytime):
    hour = date.dt.hour
    hours_to_transition = (
        (
            (hour > sunrise) * (24 - hour + sunrise)
            + (hour <= sunrise) * (sunrise - hour)
        )
        * (1 - daytime)
    ).astype(float) + (
        (
            (hour > sunset) * (24 - hour + sunset)
            + (hour <= sunset) * (sunset - hour)
        )
        * (daytime)
    ).astype(
        float
    )

    interval = (1 - daytime) * (
        (sunrise >= sunset) * (sunrise - sunset)
        + (sunrise < sunset) * (24 - sunset + sunrise)
    ) + (daytime) * (
        (sunset >= sunrise) * (sunset - sunrise)
        + (sunset < sunrise) * (24 - sunrise + sunset)
    )

    return (
        1 - hours_to_transition / interval
    ).astype(float)

@pandas_udf(IntegerType())
def get_daytime(date, sunrise, sunset):
    hour = date.dt.hour
    return (
        (sunrise > sunset) * ((hour < sunset) + (hour >= sunrise))
        + (sunrise < sunset) * ((hour >= sunrise) * (hour < sunset))
    ).astype(int)

@pandas_udf(IntegerType())
def get_month(date):
    return date.dt.month

In [58]:
def explode(iterator):
    depth_classes = [25, 50, 75, 100, 150, 200, 250, 300, 400, 500]
    for df in iterator:
        input = df.copy()
        input['_selected'] = input['depth_class']
        features = ['_selected']
        for i, depth_class in enumerate(depth_classes):
            input[f'month_{i}'] = input['month']
            input[f'daytime_{i}'] = input['daytime']
            input[f'period_progress_{i}'] = input['period_progress']
            input[f'depth_class_{i}'] = depth_class
            features.extend([f'month_{i}', f'daytime_{i}', f'period_progress_{i}', f'depth_class_{i}'])
        
        yield input[features]


import tensorflow as tf

def serialize_example(row):
    # Create a dictionary with the features
    depth_classes = [25, 50, 75, 100, 150, 200, 250, 300, 400, 500]
    feature = {
        '_selected': tf.train.Feature(int64_list=tf.train.Int64List(value=[depth_classes.index(row.depth_class)])),
    }
    for i, depth_class in enumerate(depth_classes):
        current_feature = {
            f'month_{i}': tf.train.Feature(int64_list=tf.train.Int64List(value=[row.month])),
            f'daytime_{i}': tf.train.Feature(int64_list=tf.train.Int64List(value=[row.daytime])),
            f'period_progress_{i}': tf.train.Feature(float_list=tf.train.FloatList(value=[row.period_progress])),
            f'depth_class_{i}': tf.train.Feature(int64_list=tf.train.Int64List(value=[depth_class]))
        }
        feature.update(current_feature)
    # Create an Example protocol buffer
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

states = (
    spark.read.format("jdbc").option("url",ps_conn).option("query", states_sql)
    .option("customSchema", "epoch INTEGER, h3_level_4_key BIGINT, depth FLOAT, fish_id STRING, _decision INTEGER")
    .load()
)

states = states.withColumn("depth_class", select_a_class_udf(states["depth"]))
states = states.drop("depth")

states = states.withColumn("geo", h3_to_geo_udf(states["h3_level_4_key"]))
states = states.withColumn("latitude", states["geo"]["latitude"])
states = states.withColumn("longitude", states["geo"]["longitude"])
states = states.drop("geo")

states = states.withColumn("datetime", epoch_to_datetime(states["epoch"]))
states = states.withColumn("sunrise", get_sunrise_udf(states["latitude"], states["longitude"], states["datetime"]))
states = states.withColumn("sunset", get_sunset_udf(states["latitude"], states["longitude"], states["datetime"]))
states = states.withColumn("daytime", get_daytime(states["datetime"], states["sunrise"], states["sunset"]))
states = states.withColumn("period_progress", get_period_progress(states["datetime"], states["sunrise"], states["sunset"], states["daytime"]))
states = states.withColumn("month", get_month(states["datetime"]))

states = states.drop("latitude", "longitude", "sunrise", "sunset", "datetime", "epoch")

#schema = [
#    StructField("_selected", IntegerType(), True),
#]
#for i in range(10):
#    schema.extend([
#        StructField(f"month_{i}", IntegerType(), True),
#        StructField(f"daytime_{i}", IntegerType(), True),
#        StructField(f"period_progress_{i}", FloatType(), True),
#        StructField(f"depth_class_{i}", IntegerType(), True),
#    ])
#schema = StructType(schema)
#features = states.mapInPandas(explode, schema=schema)

#output_path = "/workspaces/mirrorverse/features"
#features.write.format("tfrecords").mode("overwrite").save(output_path)

import os
import uuid

def save_to_tfrecord(rdd, output_dir):
    def write_partition(iterator):
        partition_id = str(uuid.uuid4())
        file_path = os.path.join(output_dir, f"part-{partition_id}.tfrecord")
        with tf.io.TFRecordWriter(file_path) as writer:
            for record in iterator:
                writer.write(record)

    rdd.foreachPartition(write_partition)

features = states.rdd.map(serialize_example)

output_path = "/workspaces/mirrorverse/features"
save_to_tfrecord(features, output_path)
#features.show()

                                                                                

In [83]:
depth_classes = [25, 50, 75, 100, 150, 200, 250, 300, 400, 500]
feature_description = {
    '_selected': tf.io.FixedLenFeature([], tf.int64),
}
for i, depth_class in enumerate(depth_classes):
    current_feature = {
        f'month_{i}': tf.io.FixedLenFeature([], tf.int64),
        f'daytime_{i}': tf.io.FixedLenFeature([], tf.int64),
        f'period_progress_{i}': tf.io.FixedLenFeature([], tf.float32),
        f'depth_class_{i}': tf.io.FixedLenFeature([], tf.int64)
    }
    feature_description.update(current_feature)

def _parse_function(proto):
    # Parse the input tf.train.Example proto using the dictionary above
    return tf.io.parse_single_example(proto, feature_description)


dataset = tf.data.TFRecordDataset(output_path + '/part-8ae44d22-6dbc-42c2-afad-6ec3abd642cf.tfrecord')
dataset = dataset.map(_parse_function)


from tensorflow.keras.utils import to_categorical

def prepare_data(features):
    inputs = {}
    for i in range(10):
        input = tf.stack([
            tf.cast(features[f'month_{i}'], tf.float32),
            tf.cast(features[f'daytime_{i}'], tf.float32),
            tf.cast(features[f'period_progress_{i}'], tf.float32),
            tf.cast(features[f'depth_class_{i}'], tf.float32)
        ])
        inputs[f'input_{i}'] = input
    label = to_categorical(features['_selected'], num_classes=10)
    return inputs, label

dataset = dataset.map(prepare_data)
dataset = dataset.shuffle(buffer_size=1000)
dataset = dataset.batch(batch_size=1000)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [84]:
import tensorflow.keras
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Convolution2D, MaxPooling2D, Flatten , Activation, Input, BatchNormalization, Dropout, concatenate
from tensorflow.keras.utils import to_categorical

def build_model(max_choices, features, layers, activation='linear'):
    layers.append(Dense(1, activation=activation))
    inputs = [Input(shape=(len(features),), name=f'input_{i}') for i in range(max_choices)]
    outcomes = []
    for input in inputs:
        last_layer = input
        for layer in layers:
            last_layer = layer(last_layer)
        outcomes.append(last_layer)

    outcomes = concatenate(outcomes)

    output_layer = Dense(max_choices, activation='softmax')
    output = output_layer(outcomes)
    output_layer.set_weights([np.eye(max_choices), np.zeros(max_choices)])
    output_layer.trainable = False

    model = Model(inputs=inputs, outputs=output)
    model.compile(optimizer='adam', loss='categorical_crossentropy')

    return model, layers

layers = [
    Dense(32, activation='relu'),
    Dense(16, activation='relu'),
    Dense(8, activation='relu')
]

model, layers = build_model(10, ['month', 'daytime', 'period_progress', 'depth_class'], layers)
model.summary()

In [85]:
model.fit(dataset, epochs=10)

Epoch 1/10
[1m1034/1034[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 33ms/step - loss: 2.5170
Epoch 2/10
[1m   1/1034[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m2:24[0m 140ms/step - loss: 2.0277

2024-06-24 00:10:12.256577: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
	 [[{{node IteratorGetNext}}]]
  self.gen.throw(typ, value, traceback)


[1m1034/1034[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m35s[0m 34ms/step - loss: 1.6986
Epoch 3/10
[1m   1/1034[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m2:34[0m 150ms/step - loss: 1.9163

2024-06-24 00:10:47.196689: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
	 [[{{node IteratorGetNext}}]]


[1m1034/1034[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m37s[0m 35ms/step - loss: 1.6299
Epoch 4/10
[1m   1/1034[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m3:04[0m 179ms/step - loss: 1.9295

2024-06-24 00:11:23.883284: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
	 [[{{node IteratorGetNext}}]]


[1m 919/1034[0m [32m━━━━━━━━━━━━━━━━━[0m[37m━━━[0m [1m4s[0m 40ms/step - loss: 1.6038

KeyboardInterrupt: 

In [None]:
from pyspark.sql.types import StringType, LongType
schema = StructType([
    StructField("h3_level_4_key", LongType(), True),
    StructField("fish_id", StringType(), True),
    StructField("_decision", IntegerType(), True),
    StructField("depth_class", DoubleType(), True),
    StructField("daytime", IntegerType(), True),
    StructField("period_progress", FloatType(), True),
    StructField("month", IntegerType(), True),
    StructField("_selected", FloatType(), True)
])

In [113]:
def apply_model(iterator):
    depth_classes = [25, 50, 75, 100, 150, 200, 250, 300, 400, 500]
    for df in iterator:
        inputs = []
        for depth_class in depth_classes:
            input = df.copy()
            input['depth_class'] = depth_class
            inputs.append(input)
        # we'd run the model here
        # results = model.predict(inputs)
        results = np.random.rand(df.shape[0], len(depth_classes))
        for i, input in enumerate(inputs):
            input['probability'] = results[:, i]
        yield pd.concat(inputs)
        
            

states = (
    spark.read.format("jdbc").option("url",ps_conn).option("query", states_sql)
    .option("customSchema", "epoch INTEGER, h3_level_4_key BIGINT, depth FLOAT, fish_id STRING, _decision INTEGER")
    .load()
)

states = states.withColumn("depth_class", select_a_class_udf(states["depth"]))
states = states.drop("depth")

states = states.withColumn("geo", h3_to_geo_udf(states["h3_level_4_key"]))
states = states.withColumn("latitude", states["geo"]["latitude"])
states = states.withColumn("longitude", states["geo"]["longitude"])
states = states.drop("geo")

states = states.withColumn("datetime", epoch_to_datetime(states["epoch"]))
states = states.withColumn("sunrise", get_sunrise_udf(states["latitude"], states["longitude"], states["datetime"]))
states = states.withColumn("sunset", get_sunset_udf(states["latitude"], states["longitude"], states["datetime"]))
states = states.withColumn("daytime", get_daytime(states["datetime"], states["sunrise"], states["sunset"]))
states = states.withColumn("period_progress", get_period_progress(states["datetime"], states["sunrise"], states["sunset"], states["daytime"]))
states = states.withColumn("month", get_month(states["datetime"]))

states = states.drop("latitude", "longitude", "sunrise", "sunset", "datetime", "epoch")

#elevation = (
#    spark.read.format("jdbc").option("url",ps_conn).option("query", elevation_sql)
#    .option("customSchema", "h3_level_4_key BIGINT, elevation FLOAT")
#    .load()
#)

#elevation_repartitioned = elevation.repartition("h3_level_4_key")
#states_repartitioned = states.repartition("h3_level_4_key")

#states = states_repartitioned.join(elevation_repartitioned, "h3_level_4_key", "inner")

#states.explain()

from pyspark.sql.types import StringType, LongType

schema = StructType([
    StructField("h3_level_4_key", LongType(), True),
    StructField("fish_id", StringType(), True),
    StructField("_decision", IntegerType(), True),
    StructField("depth_class", DoubleType(), True),
    StructField("daytime", IntegerType(), True),
    StructField("period_progress", FloatType(), True),
    StructField("month", IntegerType(), True),
    StructField("probability", FloatType(), True)
])

states = states.mapInPandas(apply_model, schema=schema)

states.show()



+------------------+-------+---------+-----------+-------+---------------+-----+-----------+
|    h3_level_4_key|fish_id|_decision|depth_class|daytime|period_progress|month|probability|
+------------------+-------+---------+-----------+-------+---------------+-----+-----------+
|595087630329184255|t129843|        1|       25.0|      1|     0.71428573|   12|  0.7611169|
|595087630329184255|t129843|        2|       25.0|      1|     0.71428573|   12| 0.81155527|
|595087630329184255|t129843|        3|       25.0|      1|     0.71428573|   12| 0.73917556|
|595087630329184255|t129843|        4|       25.0|      1|     0.71428573|   12|  0.3960138|
|595087630329184255|t129843|        5|       25.0|      1|     0.85714287|   12| 0.56625473|
|595087630329184255|t129843|        6|       25.0|      1|     0.85714287|   12|   0.769071|
|595087630329184255|t129843|        7|       25.0|      1|     0.85714287|   12|  0.4406883|
|595087630329184255|t129843|        8|       25.0|      1|     0.85714

                                                                                

In [104]:
df = states.toPandas()

                                                                                

In [108]:
def func(df):
    features = ['month', 'daytime', 'period_progress']
    depth_classes = [25, 50, 75, 100, 150, 200, 250, 300, 400, 500]
    inputs = []
    for depth_class in depth_classes:
        input = df[features]
        input['depth_class'] = depth_class
        inputs.append(input)
    # we'd run the model here
    # results = model.predict(inputs)
    results = np.random.rand(df.shape[0], len(depth_classes))
    for i, input in enumerate(inputs):
        input['probability'] = results[:, i]
    return pd.concat(inputs)

func(df)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  input['depth_class'] = depth_class
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  input['depth_class'] = depth_class
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  input['depth_class'] = depth_class
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_in

Unnamed: 0,month,daytime,period_progress,depth_class,probability
0,12,1,0.714286,25,0.278710
1,12,1,0.714286,25,0.477515
2,12,1,0.714286,25,0.938938
3,12,1,0.714286,25,0.060237
4,12,1,0.857143,25,0.524295
...,...,...,...,...,...
1033568,4,1,0.933333,500,0.498545
1033569,4,1,0.933333,500,0.436197
1033570,4,1,0.933333,500,0.685958
1033571,4,1,0.933333,500,0.636298
