In [1]:
import pandas as pd
import geopandas as gpd
import shapely
import sqlalchemy
import psycopg2
import osgeo.gdal
import matplotlib.pyplot as plt
import json
import matplotlib.pyplot as plt
from shapely.geometry import box
import geopandas as gpd
import mlflow
import hyperopt
# xgboost hyperopt mlflow pyspark

In [2]:
df1 = pd.read_csv('animals.csv')
df2 = pd.read_csv('animal_events.csv')
merged_df = pd.merge(df1, df2, on='animal_id', how='right')

# Extraer características temporales
merged_df['hour'] = pd.to_datetime(merged_df['timestamp']).dt.hour
merged_df['month'] = pd.to_datetime(merged_df['timestamp']).dt.month
merged_df['day'] = pd.to_datetime(merged_df['timestamp']).dt.day
merged_df['year'] = pd.to_datetime(merged_df['timestamp']).dt.year
time_merged_df = merged_df
merged_df = merged_df.drop(['timestamp','animal_id'], axis=1)
merged_df

Unnamed: 0,common_name,scientific_name,redlist_cat,megafauna,latitude,longitude,hour,month,day,year
0,Wolf,Canis lupus,Least Concern,no,45.2284,-110.7622,12,9,1,2024
1,Bison,Bison bison,Vulnerable,yes,44.576,-110.6763,12,9,1,2024
2,Elk,Cervus canadensis,Least Concern,yes,44.4232,-111.1061,12,9,1,2024
3,Sierra Nevada bighorn sheep,Ovis canadensis sierrae,Endangered,no,37.9058,-119.7857,12,9,1,2024
4,Sierra Nevada red fox,Vulpes vulpes necator,Critically Endangered,no,37.7896,-119.6426,12,9,1,2024
5,Bobcat,Lynx rufus,Least Concern,yes,37.8829,-119.7608,12,9,1,2024
6,Mule deer,Odocoileus hemionus,Least Concern,yes,36.372,-113.1627,12,9,1,2024
7,Desert bighorn sheep,Ovis canadensis nelsoni,Near Threatened,yes,36.6193,-112.3388,12,9,1,2024
8,Gray fox,Urocyon cinereoargenteus,Least Concern,yes,36.3388,-112.119,12,9,1,2024
9,Wolf,Canis lupus,Least Concern,no,44.3946,-110.8218,13,9,1,2024


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Session").getOrCreate()

# train_df, test_df = merged_df.randomSplit([.8, .1], seed=42)
# create DataFrame
df_spark = spark.createDataFrame(merged_df)
df_spark.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/21 11:11:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+--------------------+--------------------+--------------------+---------+--------+---------+----+-----+---+----+
|         common_name|     scientific_name|         redlist_cat|megafauna|latitude|longitude|hour|month|day|year|
+--------------------+--------------------+--------------------+---------+--------+---------+----+-----+---+----+
|                Wolf|         Canis lupus|       Least Concern|       no| 45.2284|-110.7622|  12|    9|  1|2024|
|               Bison|         Bison bison|          Vulnerable|      yes|  44.576|-110.6763|  12|    9|  1|2024|
|                 Elk|   Cervus canadensis|       Least Concern|      yes| 44.4232|-111.1061|  12|    9|  1|2024|
|Sierra Nevada big...|Ovis canadensis s...|          Endangered|       no| 37.9058|-119.7857|  12|    9|  1|2024|
|Sierra Nevada red...|Vulpes vulpes nec...|Critically Endang...|       no| 37.7896|-119.6426|  12|    9|  1|2024|
|              Bobcat|          Lynx rufus|       Least Concern|      yes| 37.8829|-119.

In [4]:
train_df, val_df, test_df = df_spark.randomSplit([.8,0.1,.1], seed=42)

In [5]:
train_df.show()
val_df.show()
test_df.show()

+--------------------+--------------------+--------------------+---------+--------+---------+----+-----+---+----+
|         common_name|     scientific_name|         redlist_cat|megafauna|latitude|longitude|hour|month|day|year|
+--------------------+--------------------+--------------------+---------+--------+---------+----+-----+---+----+
|               Bison|         Bison bison|          Vulnerable|      yes|  44.576|-110.6763|  12|    9|  1|2024|
|                Wolf|         Canis lupus|       Least Concern|       no| 45.2284|-110.7622|  12|    9|  1|2024|
|Sierra Nevada big...|Ovis canadensis s...|          Endangered|       no| 37.9058|-119.7857|  12|    9|  1|2024|
|              Bobcat|          Lynx rufus|       Least Concern|      yes| 37.8829|-119.7608|  12|    9|  1|2024|
|Desert bighorn sheep|Ovis canadensis n...|     Near Threatened|      yes| 36.6193|-112.3388|  12|    9|  1|2024|
|           Mule deer| Odocoileus hemionus|       Least Concern|      yes|  36.372|-113.

In [6]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator


categorical_cols = [field for (field, dataType) in train_df.dtypes if dataType == "string"]
index_output_cols = [x + "Index" for x in categorical_cols]

string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")

numeric_cols = [field for (field, dataType) in train_df.dtypes if ((dataType == "double") & (field != "latitude") & (field != "longitude"))]
assembler_inputs = index_output_cols + numeric_cols
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

In [7]:
rf_lat = RandomForestRegressor(labelCol="latitude", maxBins=40, seed=42)
pipeline_lat = Pipeline(stages=[string_indexer, vec_assembler, rf_lat])
regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="latitude")

def objective_function(params):    
    # set the hyperparameters that we want to tune
    max_depth = params["max_depth"]
    num_trees = params["num_trees"]

    with mlflow.start_run():
        estimator = pipeline_lat.copy({rf_lat.maxDepth: max_depth, rf_lat.numTrees: num_trees})
        model = estimator.fit(train_df)

        preds = model.transform(val_df)
        rmse = regression_evaluator.evaluate(preds)
        # mlflow.log_metric("rmse", rmse)

    return rmse


from hyperopt import hp

search_space = {
    "max_depth": hp.quniform("max_depth", 2, 10, 1),
    "num_trees": hp.quniform("num_trees", 10, 100, 1)
}



from hyperopt import fmin, tpe, Trials
import numpy as np
import mlflow
import mlflow.spark
mlflow.pyspark.ml.autolog(log_models=True)

num_evals = 2
trials = Trials()
best_hyperparam = fmin(fn=objective_function, 
                       space=search_space,
                       algo=tpe.suggest, 
                       max_evals=num_evals,
                       trials=trials,
                       rstate=np.random.default_rng(42))


best_max_depth = best_hyperparam["max_depth"]
best_num_trees = best_hyperparam["num_trees"]
estimator = pipeline_lat.copy({rf_lat.maxDepth: best_max_depth, rf_lat.numTrees: best_num_trees})
combined_df = train_df.union(val_df) # Combine train & validation together

pipeline_model_lat = estimator.fit(combined_df)
pred_df_lat = pipeline_model_lat.transform(test_df)
rmse_lat = regression_evaluator.evaluate(pred_df_lat)

  0%|          | 0/2 [00:00<?, ?trial/s, best loss=?]

The git executable must be specified in one of the following ways:
    - be included in your $PATH
    - be set via $GIT_PYTHON_GIT_EXECUTABLE
    - explicitly set via git.refresh(<full-path-to-git-executable>)

All git commands will error until this is rectified.

This initial message can be silenced or aggravated in the future by setting the
$GIT_PYTHON_REFRESH environment variable. Use one of the following values:
    - quiet|q|silence|s|silent|none|n|0: for no message or exception
    - error|e|exception|raise|r|2: for a raised exception

Example:
    export GIT_PYTHON_REFRESH=quiet



24/09/21 11:11:26 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 40 to 23 (= number of training instances)



 50%|█████     | 1/2 [00:23<00:23, 23.18s/trial, best loss: 0.6111546672503316]

24/09/21 11:11:47 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 40 to 23 (= number of training instances)
24/09/21 11:11:48 WARN CacheManager: Asked to cache already cached data.



100%|██████████| 2/2 [00:42<00:00, 21.08s/trial, best loss: 0.6111546672503316]

2024/09/21 11:12:04 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '2dbf6e94d15c4212afd864c5def9e669', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow





24/09/21 11:12:06 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 40 to 25 (= number of training instances)

In [8]:
rf_long = RandomForestRegressor(labelCol="latitude", maxBins=40, seed=42)
pipeline_long = Pipeline(stages=[string_indexer, vec_assembler, rf_long])
regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="longitude")

def objective_function(params):    
    # set the hyperparameters that we want to tune
    max_depth = params["max_depth"]
    num_trees = params["num_trees"]

    with mlflow.start_run():
        estimator = pipeline_long.copy({rf_long.maxDepth: max_depth, rf_long.numTrees: num_trees})
        model = estimator.fit(train_df)

        preds = model.transform(val_df)
        rmse = regression_evaluator.evaluate(preds)
        # mlflow.log_metric("rmse", rmse)

    return rmse


from hyperopt import hp

search_space = {
    "max_depth": hp.quniform("max_depth", 2, 10, 1),
    "num_trees": hp.quniform("num_trees", 10, 100, 1)
}



from hyperopt import fmin, tpe, Trials
import numpy as np
import mlflow
import mlflow.spark
mlflow.pyspark.ml.autolog(log_models=True)

num_evals = 2
trials = Trials()
best_hyperparam = fmin(fn=objective_function, 
                       space=search_space,
                       algo=tpe.suggest, 
                       max_evals=num_evals,
                       trials=trials,
                       rstate=np.random.default_rng(42))


best_max_depth_long = best_hyperparam["max_depth"]
best_num_trees = best_hyperparam["num_trees"]
estimator = pipeline_long.copy({rf_long.maxDepth: best_max_depth, rf_long.numTrees: best_num_trees})
combined_df = train_df.union(val_df) # Combine train & validation together

pipeline_model_long = estimator.fit(combined_df)
pred_df_long = pipeline_model_long.transform(test_df)
rmse_long = regression_evaluator.evaluate(pred_df_long)

  0%|          | 0/2 [00:00<?, ?trial/s, best loss=?]

24/09/21 11:12:25 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 40 to 23 (= number of training instances)
24/09/21 11:12:26 WARN CacheManager: Asked to cache already cached data.




 50%|█████     | 1/2 [00:17<00:17, 17.23s/trial, best loss: 156.9797330471338]

24/09/21 11:12:42 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 40 to 23 (= number of training instances)
24/09/21 11:12:43 WARN CacheManager: Asked to cache already cached data.



100%|██████████| 2/2 [00:33<00:00, 16.65s/trial, best loss: 156.9797330471338]

2024/09/21 11:12:57 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '3c8adcb61f7547369c6184272beef549', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow





24/09/21 11:12:58 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 40 to 25 (= number of training instances)
24/09/21 11:12:59 WARN CacheManager: Asked to cache already cached data.

In [9]:
import os
from pyspark.ml import PipelineModel

# Define the path to the saved model
model_path = "/home/jovyan/work/pipeline_model_lat"

# Step 1: Check if the model directory exists
if os.path.exists(model_path):
    # Step 2: Delete the existing model directory
    import shutil
    shutil.rmtree(model_path)

In [10]:
pipeline_model_lat.write().overwrite().save("/home/jovyan/work/pipeline_model_lat")
pipeline_model_long.write().overwrite().save("/home/jovyan/work/pipeline_model_long")

                                                                                

In [11]:
from pyspark.ml import PipelineModel
model_lat = PipelineModel.load("/home/jovyan/work/pipeline_model_lat")
model_long = PipelineModel.load("/home/jovyan/work/pipeline_model_long")
model_lat.transform(test_df).select('latitude').show()
model_long.transform(test_df).select('longitude').show()

+--------+
|latitude|
+--------+
|  37.766|
| 36.5492|
+--------+

+---------+
|longitude|
+---------+
|-119.6363|
|-112.7439|
+---------+



In [12]:
test_df.show()

+--------------------+--------------------+---------------+---------+--------+---------+----+-----+---+----+
|         common_name|     scientific_name|    redlist_cat|megafauna|latitude|longitude|hour|month|day|year|
+--------------------+--------------------+---------------+---------+--------+---------+----+-----+---+----+
|Sierra Nevada big...|Ovis canadensis s...|     Endangered|       no|  37.766|-119.6363|  13|    9|  1|2024|
|Desert bighorn sheep|Ovis canadensis n...|Near Threatened|      yes| 36.5492|-112.7439|  14|    9|  1|2024|
+--------------------+--------------------+---------------+---------+--------+---------+----+-----+---+----+



In [13]:
from pyspark.ml import PipelineModel
def predict_migration(hours,merged_df):
    
    for hour in hours:
        data = {
            'animal_id': ['A001', 'A002', 'A003', 'A004', 'A005', 'A006', 'A007', 'A008','A009'],
            'common_name': ['Wolf', 'Bison', 'Elk', 'Sierra Nevada bighorn sheep', 
                            'Sierra Nevada red fox', 'Bobcat', 'Mule deer', 'Desert bighorn sheep','Gray fox'],
            'scientific_name': ['Canis lupus', 'Bison bison', 'Cervus canadensis', 
                                'Ovis canadensis sierrae', 'Vulpes vulpes necator', 
                                'Lynx rufus', 'Odocoileus hemionus', 'Ovis canadensis nelsoni','Urocyon cinereoargenteus'],
            'redlist_cat': ['Least Concern', 'Vulnerable', 'Least Concern', 'Endangered', 
                            'Critically Endangered', 'Least Concern', 'Least Concern', 'Near Threatened','Least Concern'],
            'megafauna': ['no', 'yes', 'yes', 'no', 'no', 'yes', 'yes', 'yes','yes'],
            'timestamp': ['2024-09-01 {}:00:00'.format(hour), '2024-09-01 {}:00:00'.format(hour), '2024-09-01 {}:00:00'.format(hour), 
                          '2024-09-01 {}:00:00'.format(hour), '2024-09-01 {}:00:00'.format(hour), '2024-09-01 {}:00:00'.format(hour), 
                          '2024-09-01 {}:00:00'.format(hour), '2024-09-01 {}:00:00'.format(hour), '2024-09-01 {}:00:00'.format(hour)],
            'latitude': [45.2284, 44.5760, 44.4232, 37.9058, 37.7896, 37.8829, 36.3720, 36.6193, 36.3388],
            'longitude': [-110.7622, -110.6763, -111.1061, -119.7857, -119.6426, -119.7608, -113.1627, -112.3388, -112.1190]
        }
        df = pd.DataFrame(data)

        # Extraer características temporales
        df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
        df['month'] = pd.to_datetime(df['timestamp']).dt.month
        df['day'] = pd.to_datetime(df['timestamp']).dt.day
        df['year'] = pd.to_datetime(df['timestamp']).dt.year

        # df1=df.drop(['timestamp','animal_id'], axis=1, inplace=False)

        df1 = spark.createDataFrame(df)
        
        model_lat = PipelineModel.load("/home/jovyan/work/pipeline_model_lat")
        model_long = PipelineModel.load("/home/jovyan/work/pipeline_model_long")
        lat_pred = model_lat.transform(df1).select('latitude').rdd.flatMap(lambda x: x).collect()
        long_pred= model_long.transform(df1).select('longitude').rdd.flatMap(lambda x: x).collect()
        df1 = df1.toPandas()
        df1['timestamp']=df['timestamp']
        df1['animal_id']=df['animal_id']
        df1['latitude'] = lat_pred
        df1['longitude'] =long_pred
        predicted_df = pd.concat([merged_df, df1], ignore_index=True)
    return predicted_df

In [14]:
predicted_df = predict_migration(["15","16","17","18"],time_merged_df)

                                                                                

In [15]:
predicted_df

Unnamed: 0,animal_id,common_name,scientific_name,redlist_cat,megafauna,timestamp,latitude,longitude,hour,month,day,year
0,A001,Wolf,Canis lupus,Least Concern,no,2024-09-01 12:00:00,45.2284,-110.7622,12,9,1,2024
1,A002,Bison,Bison bison,Vulnerable,yes,2024-09-01 12:00:00,44.576,-110.6763,12,9,1,2024
2,A003,Elk,Cervus canadensis,Least Concern,yes,2024-09-01 12:00:00,44.4232,-111.1061,12,9,1,2024
3,A004,Sierra Nevada bighorn sheep,Ovis canadensis sierrae,Endangered,no,2024-09-01 12:00:00,37.9058,-119.7857,12,9,1,2024
4,A005,Sierra Nevada red fox,Vulpes vulpes necator,Critically Endangered,no,2024-09-01 12:00:00,37.7896,-119.6426,12,9,1,2024
5,A006,Bobcat,Lynx rufus,Least Concern,yes,2024-09-01 12:00:00,37.8829,-119.7608,12,9,1,2024
6,A007,Mule deer,Odocoileus hemionus,Least Concern,yes,2024-09-01 12:00:00,36.372,-113.1627,12,9,1,2024
7,A008,Desert bighorn sheep,Ovis canadensis nelsoni,Near Threatened,yes,2024-09-01 12:00:00,36.6193,-112.3388,12,9,1,2024
8,A009,Gray fox,Urocyon cinereoargenteus,Least Concern,yes,2024-09-01 12:00:00,36.3388,-112.119,12,9,1,2024
9,A001,Wolf,Canis lupus,Least Concern,no,2024-09-01 13:00:00,44.3946,-110.8218,13,9,1,2024


In [16]:
import pandas as pd
from shapely.geometry import Point, LineString
import geopandas as gpd

df = predict_migration(["19"], time_merged_df)

# Convert the 'timestamp' column to datetime format
df['timestamp'] = pd.to_datetime(df['timestamp'])

# Convert the DataFrame into a GeoDataFrame

geometry = [Point(xy) for xy in zip(df['longitude'], df['latitude'])]
gdf = gpd.GeoDataFrame(df, geometry=geometry)


import folium
from folium.plugins import TimestampedGeoJson

# Create a map centered at the average location of the points
m = folium.Map(location=[df['latitude'].mean(), df['longitude'].mean()], zoom_start=6)

# Create a color palette to differentiate the animals
color_map = {
    'Wolf': 'blue',
    'Bison': 'green',
    'Elk': 'red',
    'Sierra Nevada bighorn sheep': 'purple',
    'Sierra Nevada red fox': 'orange',
    'Bobcat': 'darkred',
    'Mule deer': 'darkblue',
    'Desert bighorn sheep': 'pink',
    'Gray fox': 'black'
}

# Create shifted columns (next_latitude and next_longitude)
gdf['next_latitude'] = gdf['latitude'].shift(-1)
gdf['next_longitude'] = gdf['longitude'].shift(-1)

# Create a GeoJSON with the points and add the corresponding time
features = []
for _, row in gdf.iterrows():
    feature = {
        'type': 'Feature',
        'geometry': {
            'type': 'Point',
            'coordinates': [row['longitude'], row['latitude']]
        },
        'properties': {
            'time': row['timestamp'].isoformat(),  # Now the timestamp is of datetime type
            'style': {'color': color_map[row['common_name']]},
            'popup': f"Animal: {row['common_name']}<br>Timestamp: {row['timestamp']}"
        }
    }
    features.append(feature)


        
# Create a GeoJSON compatible with Folium
geojson_data = {
    'type': 'FeatureCollection',
    'features': features
}

# Add the GeoJSON points to the map with timestamp support
TimestampedGeoJson(
    geojson_data,
    period='PT1H',  # The time period for points to appear (can be adjusted)
    add_last_point=True,
    auto_play=True,  # Autoplay
    loop=False,      # Don't loop the cycle
    max_speed=1,     # Maximum playback speed
    loop_button=True,  # Loop option on the map
    date_options='YYYY-MM-DD HH:mm:ss',  # Time format
    time_slider_drag_update=True  # Allow dragging the time slider
).add_to(m)

# Add trajectories (lines) for each animal
for animal in df['common_name'].unique():
    # Filter data for each animal
    animal_data = gdf[gdf['common_name'] == animal].sort_values(by='timestamp')
    
    # Create a list of coordinates for the trajectories
    trajectory = list(zip(animal_data['latitude'], animal_data['longitude']))
    
    # Add the trajectory as a PolyLine to the map
    folium.PolyLine(trajectory, color=color_map[animal], weight=4, opacity=1).add_to(m)


####################################################################
# Read the GeoJSON file
gdf_areas = gpd.read_file("protected_areas.json")

# Convert the GeoDataFrame to GeoJSON
geojson_areas = gdf_areas.to_json()

# Add the GeoJSON data to the map
folium.GeoJson(geojson_areas).add_to(m)


# Display the map
m


                                                                                