# Data Engineering Take-Home Assignment: Nature Conservation & Geospatial Data

## Context
Assume you have been hired as a Data Engineer for an organization focused on nature conservation. The organization is working on a project to monitor and protect natural habitats using satellite data, wildlife sensor data, and geospatial information. Your task is to design and implement a data pipeline that ingests, processes, and analyzes this data to help identify areas needing immediate conservation attention as well as build a model that provides helpful insights related our organization's interests.

## Objective 

Your goal in this assessment is to showcase your curiousity and creativity to design rigorous models and derive interesting insights.  

You'll be given two tasks.

The first is a design task, in which we expect you to diagram and describe how you'd set up a process to injest this data from a live streamed source, assuming you are also paying montoring services to supply this data from scratch. Think about how you might transform and store the data efficiently for querying and analysis and feed it into your model. 

The second task will require you devise interesting questions from preliminary explorations of a subset of migration data, found alongside this notebook, and construct a rigorous model to answer them. Please demonstrate all of your process using this notebook, and most importantly your outputs. 




## Tasks

### 1) Design - Data Ingestion & Storage:
- **Ingestion**: Design and implement a solution to ingest data from three different sources: GeoJSON, CSV, and JSON.
- **Automation**: Ensure the pipeline can handle regular data updates (e.g., daily or hourly).
- **Storage**: Choose appropriate storage solutions for each dataset (e.g., relational database, NoSQL, cloud storage, or data lake). Provide justification for your choices.

### 2) Data Transformation & Analysis:
- **Data Parsing & Cleaning**: 
  - Parse and clean the wildlife tracking data (CSV) and geospatial data (GeoJSON) to ensure consistency.
  - Ensure the data is ready for analysis by standardizing formats, removing errors, and handling missing values.

- **Exploratory Data Analysis**:
  - Investigate the data to understand key characteristics, distributions, and trends.

- **Behavioral Analysis**:
  - Identify more complex animal behaviors:
    - Determine when animals cross the boundaries of protected areas.
    - Analyze potential factors contributing to these crossings (e.g., time, weather, or environmental changes).
    - Calculate the total number of animal entries and exits from protected areas over time.

- **Advanced Insights**:
  - Identify migration paths or clustering patterns.
  - Build a predictive model to anticipate future animal movements or identify risk zones for endangered species.

### 3) Optional Bonus - Visualization/Reporting:
- Provide interactive visualizations to demonstrate your analysis, ideally within this notebook.

### Here are data sources you can use to build your analysis. 

- https://storage.googleapis.com/data-science-assessment/animal_events.csv
- https://storage.googleapis.com/data-science-assessment/animals.csv
- https://storage.googleapis.com/data-science-assessment/protected_areas.json
- https://storage.googleapis.com/data-science-assessment/satellites.json

## Deliverables
#### Design component:
- A clear description and diagrams for the architecture and tools you might used, including any cloud services, databases, or libraries (if applicable). During the discussion we'll go over different scenarios. 

#### Implementation:
- Code for the data pipeline that includes:
  - Data ingestion scripts or setup.
  - Transformation and processing logic.
  - Queries or outputs showcasing the results.
- (Optional) a visualization of the results.

## Data
### 1. **Animal Events - CSV** [Download link](https://storage.googleapis.com/data-science-assessment/animal_events.csv)

- Contains data on animal movement events with details like location and speed.
- **Key Columns**: `event_id`, `animal_id`, `timestamp`, `latitude`, `longitude`, `speed`.

---

### 2. **Animals - CSV** [Download link](https://storage.googleapis.com/data-science-assessment/animals.csv)

- Metadata about tracked animals, including species and conservation status.
- **Key Columns**: `animal_id`, `species`, `endangered`, `animal_type`, `preferred_landcover`.

---

### 3. **Protected Areas - GeoJSON** [Download link](https://storage.googleapis.com/data-science-assessment/protected_areas.json)

- Geospatial data representing protected areas with boundaries and metadata.
- **Key Fields**: `name`, `category`, `protected_area_id`, `geometry`.

---

### 4. **Satellite Metadata - JSON** [Download link](https://storage.googleapis.com/data-science-assessment/satellites.json)

- Metadata from satellite imagery, covering factors like cloud cover and resolution.
- **Key Fields**: `satellite_id`, `start_time`, `last_time`, `frequency`, `bounding_box`, `cloud_cover_percentage`, `resolution`.

---

## Evaluation Criteria

- **Data Engineering Skills**: How well the pipeline handles ingestion, transformation, and storage.
- **Geospatial Data Handling**: Ability to process geospatial data and perform spatial operations (e.g., joins, intersections).
- **Scalability & Efficiency**: The pipeline’s ability to handle larger datasets or more frequent updates.
- **Code Quality**: Structure, readability, and use of best practices.
- **Documentation**: Clear explanations of your approach and any assumptions made.
- **Bonus (Visualization/Reporting)**: Extra points for insightful data visualization or reporting.

## Set up

Feel free to set up this notebook using condo, or your own kernal / virtual environment. To make it easier, you can set up the notebook using this docker with the potentialy libraries you might need. 

#### To start using a prepared Docker image, 
- 1 navigate to this shared folder in your terminal, and then load up docker and run the docker file to pull in needed libraries

```bash
docker build -t geospatial-notebook .
docker run -p 8888:8888 -v $(pwd):/home/nobody/work geospatial-notebook
```


When the container runs, it will display a URL with a token (something like http://127.0.0.1:8888/?token=...). It will probably be something like http://127.0.0.1:8888/tree You can copy this URL into your browser, and you'll open to a Jupyter lab. Your existing notebook will be available inside the container under the work directory.

Anytime you want to work again, just run the following command to start the Docker container and access your notebooks:

```bash
docker run -p 8888:8888 -v $(pwd):/home/nobody/work geospatial-notebook
```

Please be sure your notebook runs by adding the needed dependencies to the requirements doc. I would encourage you to avoid external dependencies, like postgres setup for the implementation of your work so that the notebook work works without significant setup. You can demonstrate your knowledge of various infrastructure in your design submission. 

Critiques of this assignment are also welcomed and will contribute to this score. `


### Architecture

![title](output/Cultivo_v4.png)

In [229]:
# Libraries you may or may not need
import numpy as np

import pandas as pd

import geopandas as gpd

import plotly.graph_objects as go
import plotly.express as px
from plotly import offline
offline.init_notebook_mode(connected=True)

import shapely
from shapely.geometry import Point, Polygon, box

import json

import matplotlib
import matplotlib.pyplot as plt

import warnings
# import sqlalchemy
# import psycopg2
# import osgeo.gdal

In [2]:
INPUT_FILE_PATH = "./data/"
OUTPUT_FILE_PATH = "./data/"
defualt_plotly_config = {'scrollZoom': True}

### load the needed datapoints


In [432]:

# loads original animal_events and animals
animal_events_df = pd.read_csv(f"{INPUT_FILE_PATH}animal_events.csv")
animal_df = pd.read_csv(f"{INPUT_FILE_PATH}animals.csv")


# load protected areas
protected_areas_gdf = gpd.read_file(f"{INPUT_FILE_PATH}protected_areas.json")

# load satellites area
with open(f"{INPUT_FILE_PATH}satellites.json") as f:
    satellites_dict = json.load(f)

### ETL functions

In [494]:
def formatter(raw_df: pd.DataFrame, config: dict, logger=None):
    """
    Format the specified columns in a DataFrame to their designated data types.

    Args:
        raw_df (pd.DataFrame): The input DataFrame, typically unformatted and 
            generated from a function like `pd.read_csv()`.
        config (dict[str, str]): A dictionary where each key is a column name in `df`, 
            and the corresponding value is the target data type to which the column should be cast.
        logger (Any, optional): A logger object used to record any error or warning messages that 
            occur during the formatting process. Defaults to None.

    Returns:
        pd.DataFrame: The modified DataFrame with columns cast to the specified data types.

    Raises:
        ValueError: If the column is not found in the DataFrame or the conversion fails.
    """
    _supported_dtypes = set([
        'int64',
        'int32',
        'int16',
        'int8',
        'float64',
        'float32',
        'float16',
        'bool',
        'str',
        'category'
    ])
    
    df = raw_df.copy()
    
    for column, dtype in config.items():
        if dtype not in _supported_dtypes:
            if logger:
                logger.error(
                    f"{dtype} is not supported." \
                    f"Supported dtype: {' ,'.join(_supported_dtypes)}"
                )
            raise ValueError(
                f"{dtype} is not supported." \
                f"Supported dtype: {' ,'.join(_supported_dtypes)}"
            )
        
        if column not in df.columns:
            if logger:
                logger.warning(f"Column '{column}' not found in DataFrame. Skipping...")
            warnings.warn(f"Column '{column}' not found in DataFrame. Skipping...")
            continue

        try:
            df[column] = df[column].astype(dtype)
        except Exception as e:
            if logger:
                logger.error(
                    f"Error converting column '{column}' to {dtype}. {str(e)}."
                )
            raise ValueError(
                    f"Error converting column '{column}' to {dtype}. {str(e)}."
                )

    return df


In [400]:
test_df = pd.read_csv(f"{INPUT_FILE_PATH}animal_events.csv")
config = {
    'latitude': 'float64',
    'longitude': 'float64'
}
test_df_1 = formatter(test_df, config)
test_df_1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 27 entries, 0 to 26
Data columns (total 4 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   animal_id  27 non-null     object 
 1   timestamp  27 non-null     object 
 2   latitude   27 non-null     float64
 3   longitude  27 non-null     float64
dtypes: float64(2), object(2)
memory usage: 992.0+ bytes


In [495]:
def str_formatter(raw_df: pd.DataFrame, config: dict, logger=None):
    """
    Format the specified columns in a DataFrame to their designated data types.

    Args:
        raw_df (pd.DataFrame): The input DataFrame, typically unformatted and 
            generated from a function like `pd.read_csv()`.
        config (dict[dict[str, Any]]): An outer dictionary where each key represents a column name in `df`. 
            The corresponding value is another dictionary, with keys as the mapped string values and 
            values as the associated mapping values.
        logger (Any, optional): A logger object used to record any error or warning messages that 
            occur during the formatting process. Defaults to None.

    Returns:
        pd.DataFrame: The modified DataFrame.

    Raises:
        ValueError: If the column is not found in the DataFrame or the conversion fails.
    """
    
    df = raw_df.copy()
    
    for column, mapping_dict in config.items():        
        if column not in df.columns:
            if logger:
                logger.warning(f"Column '{column}' not found in DataFrame. Skipping...")
            warnings.warn(f"Column '{column}' not found in DataFrame. Skipping...")
            continue

        try:
            df[column] = df[column].apply(lambda x: mapping_dict[x])
        except Exception as e:
            if logger:
                logger.error(
                    f"Error converting column '{column}'. {str(e)}."
                )
            raise ValueError(
                    f"Error converting column '{column}'. {str(e)}."
                )

    return df


In [407]:
test_str_df = pd.read_csv(f"{INPUT_FILE_PATH}animals.csv")
config = {
    'megafauna': {
        'no': False,
        'yes': True
    },
}
test_str_df1 = str_formatter(test_str_df, config)
test_str_df1.info()
test_str_df1[:2]

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9 entries, 0 to 8
Data columns (total 5 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   animal_id        9 non-null      object
 1   common_name      9 non-null      object
 2   scientific_name  9 non-null      object
 3   redlist_cat      9 non-null      object
 4   megafauna        9 non-null      bool  
dtypes: bool(1), object(4)
memory usage: 425.0+ bytes


Unnamed: 0,animal_id,common_name,scientific_name,redlist_cat,megafauna
0,A001,Wolf,Canis lupus,Least Concern,False
1,A002,Bison,Bison bison,Vulnerable,True


In [496]:
def date_formatter(raw_df: pd.DataFrame, time_config: dict, logger=None):
    """
    Adds a new 'geometry' column containing `shapely.geometry.Point` objects based on the given 
    latitude and longitude columns.
    
    Args:
        raw_df (pd.DataFrame): The input DataFrame to modify.
        time_config (dict[str, str]): A dictionary where each key is a column name in `df`, 
            and the corresponding value is the time format.
        logger (Any): A logger object used to record error and warning messages.
    
    Returns:
        pd.DataFrame: The modified DataFrame.
    """
    df = raw_df.copy()
    
    for column, time_format in time_config.items():
        if column not in df.columns:
            if logger:
                logger.warning(f"Column '{column}' not found in DataFrame. Skipping...")
            warnings.warn(f"Column '{column}' not found in DataFrame. Skipping...")
            continue

        try:
            df[column] = pd.to_datetime(df[column], format=time_format)

        except Exception as e:
            if logger:
                logger.error(
                    f"Error converting column '{column}' with format {time_format}. {str(e)}."
                )
            raise ValueError(
                    f"Error converting column '{column}' with format {time_format}. {str(e)}."
                )
    return df


In [348]:
test_df_2 = date_formatter(test_df_1, {'timestamp': '%Y-%m-%d %H:%M:%S'})
test_df_2.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 27 entries, 0 to 26
Data columns (total 4 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   animal_id  27 non-null     object        
 1   timestamp  27 non-null     datetime64[ns]
 2   latitude   27 non-null     float64       
 3   longitude  27 non-null     float64       
dtypes: datetime64[ns](1), float64(2), object(1)
memory usage: 992.0+ bytes


In [332]:
test_time_df = pd.DataFrame({'time1': ['2024-06-01'], 'time2': ['06-02-08'], 'time3': ['06_02_08']})
time_config = {
    'time1': '%Y-%m-%d',
    'time2': '%m-%d-%y',
    'time3': '%m_%d_%y',
}
test_time_df_1 = date_formatter(test_time_df, time_config)
test_time_df_1

Unnamed: 0,time1,time2,time3
0,2024-06-01,2008-06-02,2008-06-02


In [497]:
def time_extractor(raw_df: pd.DataFrame, time_column: str, logger=None):
    """
    Extract the year, month, day, and date from a specified time column.
    
    Args:
        raw_df (pd.DataFrame): The DataFrame to be modified.
        time_column (str): The name of the column containing time data.
        logger (Any): An optional logger object for recording error and warning messages.
        
    Returns:
        pd.DataFrame: The updated DataFrame, which includes the newly created `year`, `month`,
            `day`, and `date` columns.
    """
    df = raw_df.copy()
    
    try:
        # Extract year, month, and day
        df['year'] = df[time_column].dt.year
        df['month'] = df[time_column].dt.month
        df['day'] = df[time_column].dt.day
        df['date'] = df[time_column].dt.strftime('%Y-%m-%d')
    except Exception as e:
        if logger:
            logger.error(
                f"Error extracing time elements from column '{time_column}'. {str(e)}." 
            )
        raise ValueError(
                f"Error extracing time elements from column '{time_column}'. {str(e)}."
            ) 
    return df


In [517]:
def geo_formatter(df: pd.DataFrame, lat_column: str, lon_column: str, logger=None):
    """
    Adds a new 'geometry' column containing `shapely.geometry.Point` objects based on the given 
    latitude and longitude columns.
    
    Args:
        df (pd.DataFrame): The input DataFrame to modify.
        lat_column (str | float): The name of the column representing the latitude values.
        lon_column (str | float): The name of the column representing the longitude values.
        logger (Any): A logger object used to record error and warning messages.
    
    Returns:
        pd.DataFrame: The modified DataFrame with the newly generated 'geometry' column.
    """
    try:
        df['geometry'] = df.apply(lambda row: Point(row[lon_column], row[lat_column]), axis=1)
    except Exception as e:
        if logger:
            logger.error(
                f"Error converting column '{lat}' and '{lon}' to geometry. {str(e)}." 
            )
        raise ValueError(
                f"Error converting column '{lat}' and '{lon}' to geometry. {str(e)}." 
            ) 
    return df


In [335]:
# testing 
test_df_3 = geo_formatter(test_df_2, 'latitude', 'longitude')
test_df_3.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 27 entries, 0 to 26
Data columns (total 5 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   animal_id  27 non-null     object        
 1   timestamp  27 non-null     datetime64[ns]
 2   latitude   27 non-null     float64       
 3   longitude  27 non-null     float64       
 4   geometry   27 non-null     object        
dtypes: datetime64[ns](1), float64(2), object(2)
memory usage: 1.2+ KB


In [498]:
def generating_gdf(df: pd.DataFrame, logger=None):
    """
    Converts a DataFrame to a GeoDataFrame.

    Args:
        df (pd.DataFrame): A formatted DataFrame containing a 'geometry' column with valid 
        geometric data (e.g., `shapely.geometry.Point` objects).
        logger (Any): A logger object used to record error and warning messages.
    
    Returns:
        gdf (gpd.GeoDataFrame): A GeoDataFrame generated from the input DataFrame.
    """
    try:
        gdf = gpd.GeoDataFrame(df, geometry='geometry')
    except Exception as e:
        if logger:
            logger.error(
                f"Error converting DataFame to GeoDataFrame. {str(e)}." 
            )
        raise ValueError(
                f"Error converting DataFame to GeoDataFrame. {str(e)}." 
            )  
    return(gdf)

In [339]:
test_gdf = generating_gdf(test_df_3)
test_gdf.info()

<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 27 entries, 0 to 26
Data columns (total 5 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   animal_id  27 non-null     object        
 1   timestamp  27 non-null     datetime64[ns]
 2   latitude   27 non-null     float64       
 3   longitude  27 non-null     float64       
 4   geometry   27 non-null     geometry      
dtypes: datetime64[ns](1), float64(2), geometry(1), object(1)
memory usage: 1.2+ KB


In [367]:
def error_checker(gdf: gpd.GeoDataFrame, config: dict[str, tuple], logger=None):
    """
    Checks if data in a reasonable range. 

    Args:
        gdf (gpd.GeoDataFrame): A GeoDataFrame containing the data to be tested.
        config (dict[str, tuple[int|float, int|float]]): A dictionary where each key 
            represents a column name in the GeoDataFrame, and the corresponding value is a 
            tuple defining the acceptable range (min, max) for that column.
        logger (Any): A logger object used to record error and warning messages.
    
    """
    for column, val_limit in config.items():
        if column not in gdf.columns:
            if logger:
                logger.warning(f"Column '{column}' not found in DataFrame. Skipping...")
            warnings.warn(f"Column '{column}' not found in DataFrame. Skipping...")
            continue
        
        min_value, max_value = val_limit
        if not ((gdf[column] >= min_value).all() & (gdf[column] <= max_value).all()):
            if logger:
                logger.error(
                    f"The value in column {column} is not in the range of {min_value} and {max_value}." 
                )
            raise ValueError(
                    f"The value in column {column} is not in the range of {min_value} and {max_value}." 
                )

In [370]:
error_checker(test_gdf, {'latitude': (-90, 90), 'longitude': (-180, 180)})

In [368]:
# negative testing: make sure it is working
test_value_df = pd.DataFrame({'a': [100, 5, 9, 2, 3]})
error_checker(test_value_df, {'a': (0, 10)})

ValueError: The value in column a is not in the range of 0 and 10.

In [499]:
def null_handler(raw_gdf: gpd.GeoDataFrame, policy: str, logger=None):
    """
    Checks if data in a reasonable range. 

    Args:
        raw_gdf (gpd.GeoDataFrame): A GeoDataFrame containing the data to be tested.
        policy (str): ['drop', 'mean', 'median']. 
        logger (Any): A logger object used to record error and warning messages.
    Return:
        gdf (gpd.GeoDataFrame): The modifid gdf
    
    """
    assert policy in ['drop', 'mean', 'median']
    
    gdf = raw_gdf.copy()
    
    try:
        if policy == 'drop':
            gdf.dropna(axis=0, inplace=True)
        elif policy == 'mean':
            for column in gdf.columns:
                if pd.api.types.is_numeric_dtype(gdf[column]):
                    # gdf[column].fillna(gdf.mean(), inplace=True)
                    gdf.loc[gdf[column].isna(), column] = gdf[column].mean()
        elif policy == 'median':
            for column in gdf.columns:
                if pd.api.types.is_numeric_dtype(gdf[column]):
                    gdf.loc[gdf[column].isna(), column] = gdf[column].median()
        
    except Exception as e:
        if logger:
            logger.error(
                f"Error filling na with the policy {policy}. {str(e)}." 
            )
        raise ValueError(
                f"Error filling na with the policy  {policy}. {str(e)}." 
            )   
    if gdf.isna().any().any():
        if logger:
            logger.error(
                f"Null values are still present in the GeoDataFrame. Please inspect the non-numeric columns" 
            )
        raise ValueError(
                f"Null values are still present in the GeoDataFrame. Please inspect the non-numeric columns" 
            )   
    return(gdf)

In [387]:
test_gdf = null_handler(test_gdf, policy='mean')

In [385]:
test_na_df = pd.DataFrame({'a': [1, 2, 3, 2], 'b':[None, 5, 6, 1]})
null_handler(test_na_df, policy='median')

Unnamed: 0,a,b
0,1,5.0
1,2,5.0
2,3,6.0
3,2,1.0


Create a mesh for the specified area using the given grid size. This approach allows for easy aggregation of information in a clear and organized manner. For instance, it can be used to summarize data such as total wolf occurrences, geospatial patterns, and demographic information.

In [492]:
def create_mesh(xmin: float, ymin: float, xmax: float, ymax: float, mesh_size: float):
    """
    Generate regions on a map.

    TODO: Implement differnet methods to generate shapes of mesh, such as triangles.

    Args:
        xmin (float): Minimum longitude.
        ymin (float): Minimum latitude.
        xmax (float): Maximum longitude.
        ymax (float): Maximum latitude.
        mesh_size (float): Size of the mesh.

    Returns:
        mesh_df (gpd.GeoDataFrame): A GeoDataFrame containing the designated region IDs.
    """
    region_id = 0
    region_list = []
    
    x_coords = np.linspace(xmin, xmax, mesh_size)
    y_coords = np.linspace(ymin, ymax, mesh_size)
    
    polygons = []
    for x_idx in range(1, len(x_coords)):
        for y_idx in range(1, len(y_coords)):
            
            polygons.append(box(x_coords[x_idx-1], y_coords[y_idx-1], x_coords[x_idx], y_coords[y_idx]))
            
            region_list.append(region_id)
            region_id += 1
            
    return gpd.GeoDataFrame({"region_id": region_list}, geometry=polygons)

In [500]:
def mapping_to_mesh(gdf, mesh_gdf):
    """
    Map `Point` to the corresponding region id

    Args:
        gdf (gpd.GeoDataFrame): mapping gdf
        mesh_gdf (gpd.GeoDataFrame): generated mesh gdf

    Returns:
        mesh_df (gpd.GeoDataFrame): The modified GeoDataFrame containing the designated region IDs 
            corresponding to the lat and lon.
    """
    output_gdf = gpd.sjoin(gdf, mesh_gdf, how="left", predicate="within")
    # output_gdf = output_gdf.dropna()
    return(output_gdf)

## ETL on animal activity df

In [585]:
format_config = {
    'latitude': 'float64',
    'longitude': 'float64',
    'animal_id': 'str',
    'common_name': 'str',
    'scientific_name': 'str',
    'redlist_cat': 'category',
    'megafauna': 'str'
}
str_config = {
    'megafauna': {
        'no': False,
        'yes': True
    },
}
time_config = {
    'timestamp': '%Y-%m-%d %H:%M:%S'
}

# merge animal_events_df with animal_df as animal_full_df
animal_full_df = animal_events_df.merge(animal_df, on='animal_id', how='left')

animal_full_df1 = formatter(animal_full_df, format_config)
animal_full_df2 = str_formatter(animal_full_df1, str_config)
animal_full_df3 = date_formatter(animal_full_df2, time_config)
animal_full_df4 = time_extractor(animal_full_df3, 'timestamp')
animal_full_df5 = geo_formatter(animal_full_df4, 'latitude', 'longitude')
animal_full_gdf = generating_gdf(animal_full_df5)
error_checker(animal_full_gdf, {'latitude': (-90, 90), 'longitude': (-180, 180)})
animal_full_gdf = null_handler(animal_full_gdf, policy='drop')

## Generate regional data

In [586]:
mesh_gdf = create_mesh(-111.10, 44.2, -110.10, 44.6, 10)
animal_full_region_gdf = mapping_to_mesh(animal_full_gdf, mesh_gdf)

In [523]:
animal_full_region_gdf.groupby(['region_id']).size().reset_index()

Unnamed: 0,region_id,0
0,16.0,1
1,18.0,1
2,22.0,1
3,25.0,1
4,34.0,1
5,35.0,1
6,77.0,1


### visualize animal events, protected areas, and satellite areas

In [444]:
def plot(
        satellites_dict: dict, 
        protected_areas_gdf: gpd.GeoDataFrame,  
        animal_full_gdf: gpd.GeoDataFrame,
        show_satellites: bool = False,
        fig: go.Figure = None,
        mapbox: dict=None
        
) -> go.Figure:
    """
    Visualize spatial data using Plotly, including animal observation locations, protected areas, 
    and satellite coverage.

    Args:
        satellites_dict (list[dict]): A list of dictionaries imported from "satellites.json", 
            where each dictionary contains a bounding_box defining the satellite coverage area.
        protected_areas_gdf (gpd.GeoDataFrame): A GeoDataFrame containing information about designated 
            protected areas.
        animal_full_gdf (gpd.GeoDataFrame): A GeoDataFrame containing information about animal activities 
            and their corresponding properties.
        show_satellites (bool): A flag indicating whether to display satellite coverage areas 
            in the visualization.
        fig (go.Figure): Given if want to append layers on 
            existing Figure
        mapbox (dict): customized argument for 
            Figure showing.  
    Returns:
        Figure: Plotly Figure object.
    """
    if fig is None:
        fig = go.Figure()
        
    if mapbox is None:
        mapbox = dict(
            center=dict(lat=40, lon=-110),
            zoom=3,  # Adjust the zoom level as needed
        )
    
    
    if show_satellites:
        # Add satellite area
        for data_idx, data in enumerate(satellites_dict):
            # if data_idx == 1:
                # This is satellite image cover all earth
            #    continue
            # Extract bounding box coordinates
            bounding_box = data['bounding_box']
            lon = [bounding_box['xmin'], bounding_box['xmin'], bounding_box['xmax'], bounding_box['xmax']]
            lat = [bounding_box['ymin'], bounding_box['ymax'], bounding_box['ymax'], bounding_box['ymin']]

            # Add a filled polygon for the bounding box
            fig.add_trace(go.Scattermapbox(
                lat=lat,
                lon=lon,
                fill='toself',
                fillcolor='rgba(255, 0, 0, 0.1)', 
                line=dict(color='black'),
                name='Bounding Box'
            ))



    # Add protected area
    for _, row in protected_areas_gdf.iterrows():
        # Extract coordinates from the geometry
        lon, lat = row['geometry'].exterior.xy 

        # Convert coordinates to lists
        lon = list(lon)
        lat = list(lat)

        # Add the polygon to the figure with filled area
        fig.add_trace(go.Scattermapbox(
            lat=lat,
            lon=lon,
            fill='toself',  
            fillcolor='rgba(128, 128, 128, 0.5)',  
            line=dict(color='black'),  
            name=row.get('name', 'Area')  
        ))


    # Add animal activities
    # Unique common names to assign colors
    unique_names = animal_full_gdf['common_name'].unique()
    color_map = {name: i for i, name in enumerate(unique_names)}

    # Add traces for scatter points colored by common_name
    for name in unique_names:
        subset = animal_full_gdf[animal_full_gdf['common_name'] == name]
        
        
        hover_text = [
            f"Animal ID: {row['animal_id']}<br>"
            f"Scientific Name: {row['scientific_name']}<br>"
            f"Redlist Category: {row['redlist_cat']}<br>"
            f"Megafauna: {row['megafauna']}<br>"
            f"Latitude: {row['latitude']}<br>"
            f"Longitude: {row['longitude']}"
            for _, row in subset.iterrows()
        ]
        
        
        fig.add_trace(go.Scattermapbox(
            lat=subset['latitude'],
            lon=subset['longitude'],
            mode='markers',
            name=name,
            
            marker=dict(size=10, color=color_map[name]), 
            showlegend=True,
            text=hover_text,
            hoverinfo='text'
        ))




    fig.update_layout(
        mapbox_style="open-street-map",
        mapbox=mapbox,
        
    )

    
    return(fig)

In [608]:
fig = plot(satellites_dict, protected_areas_gdf, animal_full_gdf, show_satellites=True)
fig.show(config=defualt_plotly_config)

![title](output/satellite_vis.png)

In [610]:
fig = plot(satellites_dict, protected_areas_gdf, animal_full_gdf)
fig.show(config=defualt_plotly_config)

![title](output/animal_vis.png)

In [198]:
def plot_t(
        satellites_dict: dict, 
        protected_areas_gdf: gpd.GeoDataFrame,  
        animal_full_gdf: gpd.GeoDataFrame,
        show_satellites: bool = False,
        fig: go.Figure = None,
        time: str = None,
        mapbox: dict = None
        
) -> go.Figure:
    """
    Visualize spatial data using Plotly, including animal observation locations, protected areas, 
    and satellite coverage. This version allows for specifying a `time` parameter to filter and display 
    animal observations for a given date.
    
    TODO: 
    1. Optimize with a time slider to show data across all available dates.
    2. Add functionality to focus on one specific animal's data.

    Args:
        satellites_dict (list[dict]): A list of dictionaries imported from `satellites.json`, 
            where each dictionary contains a bounding_box that defines the satellite coverage area.
        protected_areas_gdf (gpd.GeoDataFrame): A GeoDataFrame containing information about designated 
            protected areas.
        animal_full_gdf (gpd.GeoDataFrame): A GeoDataFrame containing information about animal activities 
            and their corresponding properties.
        show_satellites (bool): A flag indicating whether to display satellite coverage areas 
            in the visualization.
        fig (go.Figure): Optional. If provided, appends layers to the 
            existing Plotly figure.
        time (str): Specifies the date of the animal activity in the 
            format '%Y-%m-%d'.
        mapbox (dict): Optional. Customized settings for displaying the 
            figure, such as zoom and map style.

    Returns:
        Figure: The updated Plotly Figure object.
    """
    if fig is None:
        fig = go.Figure()
    
    if mapbox is None:
        mapbox=dict(
            center=dict(lat=44.4, lon=-110.6),
            zoom=8,  # Adjust the zoom level as needed
        )
    
    
    if show_satellites:
        # Add satellite area
        for data_idx, data in enumerate(satellites_dict):
            if data_idx == 1:
                # This is satellite image cover all earth
                continue
            # Extract bounding box coordinates
            bounding_box = data['bounding_box']
            lon = [bounding_box['xmin'], bounding_box['xmin'], bounding_box['xmax'], bounding_box['xmax']]
            lat = [bounding_box['ymin'], bounding_box['ymax'], bounding_box['ymax'], bounding_box['ymin']]

            # Add a filled polygon for the bounding box
            fig.add_trace(go.Scattermapbox(
                lat=lat,
                lon=lon,
                fill='toself',
                fillcolor='rgba(255, 0, 0, 0.1)', 
                line=dict(color='black'),
                name='Bounding Box'
            ))



    # Add protected area
    for _, row in protected_areas_gdf.iterrows():
        # Extract coordinates from the geometry
        lon, lat = row['geometry'].exterior.xy 

        # Convert coordinates to lists
        lon = list(lon)
        lat = list(lat)

        # Add the polygon to the figure with filled area
        fig.add_trace(go.Scattermapbox(
            lat=lat,
            lon=lon,
            fill='toself',  
            fillcolor='rgba(128, 128, 128, 0.5)',  
            line=dict(color='black'),  
            name=row.get('name', 'Area')  
        ))
    
    

    animal_full_gdf['time'] = animal_full_gdf['timestamp'].apply(lambda x: x.date())
    
    if time:
        time = pd.to_datetime(time, format='%Y-%m-%d').date()
        animal_full_gdf = animal_full_gdf[animal_full_gdf['time'] == time]

    # Add animal activities
    # Unique common names to assign colors
    unique_names = animal_full_gdf['common_name'].unique()
    color_map = {name: i for i, name in enumerate(unique_names)}

    # Add traces for scatter points colored by common_name
    for name in unique_names:
        subset = animal_full_gdf[animal_full_gdf['common_name'] == name]


        hover_text = [
            f"Animal ID: {row['animal_id']}<br>"
            f"Scientific Name: {row['scientific_name']}<br>"
            f"Redlist Category: {row['redlist_cat']}<br>"
            f"Megafauna: {row['megafauna']}<br>"
            f"Latitude: {row['latitude']}<br>"
            f"Longitude: {row['longitude']}"
            for _, row in subset.iterrows()
        ]


        fig.add_trace(go.Scattermapbox(
            lat=subset['latitude'],
            lon=subset['longitude'],
            mode='markers',
            name=name,

            marker=dict(size=10, color=color_map[name]), 
            showlegend=True,
            text=hover_text,
            hoverinfo='text'
        ))



    fig.update_layout(
        mapbox_style="open-street-map",
        mapbox=mapbox,
        
    )


    
    return(fig)

In [199]:
fig = plot_t(satellites_dict, protected_areas_gdf, animal_full_gdf, time='2024-09-01')
fig.show()

In [602]:
def plot_region(mesh_gdf, fig=None):
    """
    Visualize the design regions on a map.

    Args:
        mesh_gdf (gpd.GeoDataFrame): A GeoDataFrame containing `geometry` data corresponding to region IDs.

    Returns:
        go.Figure: The updated Plotly Figure object displaying the design regions.
    """
    
    if fig is None:
        fig = go.Figure()
    
    for _, row in mesh_gdf.iterrows():
        # Extract coordinates from the geometry
        lon, lat = row['geometry'].exterior.xy 

        # Convert coordinates to lists
        lon = list(lon)
        lat = list(lat)

        # Add the polygon to the figure with filled area
        fig.add_trace(go.Scattermapbox(
            lat=lat,
            lon=lon,
            fill='toself',  
            fillcolor='rgba(135, 206, 235, 0.5)',  
            line=dict(color='blue'),
            showlegend=False,
            name=row.get('region_id', 'Area')  
        ))
    fig.update_layout(
            mapbox_style="open-street-map"
        )
    return(fig)

In [603]:
fig = plot_region(mesh_gdf)
fig = plot(satellites_dict, protected_areas_gdf, animal_full_region_gdf, fig=fig)
fig.update_layout(
    mapbox=dict(
            center=dict(lat=44.4, lon=-110.6),
            zoom=8,  # Adjust the zoom level as needed
        )
)
fig.show(config=defualt_plotly_config)

![title](output/region_vis.png)

### Create synthetic data

The current data is too sparse to effectively illustrate the pattern.

In [561]:
def add_day(row, day):
    """
    Update `timeframe` to `day` later.
    """
    return(row['timestamp'] + pd.Timedelta(days=day))

def add_noise_to_latitude(row, column, dist, dist_arg):
    """
    add noise to the existing column
    """
    noise = dist(**dist_arg)
    return(row[column] + noise)

def generate_synthetic_data(df, dist_fnc, dist_arg, days = 3):
    """
    Create sybtheic data with updating date, latitude, and longitude
    """
    df_copy = df.copy()
    for day in range(1, days+1):
        generated_df = df_copy.copy()
        
        generated_df['timestamp'] = generated_df.apply(
            lambda x: add_day(x, day), 
            axis=1
        )
        
        generated_df['latitude'] = generated_df.apply(
            lambda x: add_noise_to_latitude(x, 'latitude', dist_fnc, dist_arg), 
            axis=1
        )
        
        generated_df['longitude'] = generated_df.apply(
            lambda x: add_noise_to_latitude(x, 'longitude', dist_fnc, dist_arg), 
            axis=1
        )
        
        df = pd.concat([df, generated_df])

    return(df)
        

In [591]:
animal_full_synthetic_df = animal_full_df.copy()
time_config = {
    'timestamp': '%Y-%m-%d %H:%M:%S'
}
animal_full_synthetic_df = date_formatter(animal_full_synthetic_df, time_config)
print(animal_full_synthetic_df.shape)
normal_arg = {'loc': 0, 'scale': 0.1}
animal_full_synthetic_df = generate_synthetic_data(animal_full_synthetic_df, np.random.normal, normal_arg, 5)
print(animal_full_synthetic_df.shape)

animal_full_synthetic_df = formatter(animal_full_synthetic_df, format_config)
animal_full_synthetic_df = str_formatter(animal_full_synthetic_df, str_config)
animal_full_synthetic_df = date_formatter(animal_full_synthetic_df, time_config)
animal_full_synthetic_df = time_extractor(animal_full_synthetic_df, 'timestamp')
animal_full_synthetic_df = geo_formatter(animal_full_synthetic_df, 'latitude', 'longitude')
animal_full_synthetic_gdf = generating_gdf(animal_full_synthetic_df)
error_checker(animal_full_synthetic_gdf, {'latitude': (-90, 90), 'longitude': (-180, 180)})
animal_full_synthetic_gdf = null_handler(animal_full_synthetic_gdf, policy='drop')

(27, 8)
(162, 8)


In [592]:
mesh_gdf = create_mesh(-111.10, 44.2, -110.10, 44.6, 10)
animal_full_synthetic_gdf = mapping_to_mesh(animal_full_synthetic_gdf, mesh_gdf)
fig = plot_region(mesh_gdf)
fig = plot(satellites_dict, protected_areas_gdf, animal_full_synthetic_gdf, fig=fig)
fig.update_layout(mapbox=dict(
            center=dict(lat=44.4, lon=-110.6),
            zoom=8,  # Adjust the zoom level as needed
        ))
fig.show(config=defualt_plotly_config)


![title](output/syntatic_vis.png)

In [606]:
fig = plot_region(mesh_gdf)
fig = plot_t(satellites_dict, protected_areas_gdf, animal_full_synthetic_gdf, fig=fig, time='2024-09-01')
fig.show(config=defualt_plotly_config)

In [607]:
fig = plot_region(mesh_gdf)
fig = plot_t(satellites_dict, protected_areas_gdf, animal_full_synthetic_gdf, fig=fig, time='2024-09-02')
fig.show(config=defualt_plotly_config)

In [581]:
# some data points is not in the designed region, hence, there is no region index
animal_full_synthetic_gdf.dropna(inplace=True) 
animal_full_synthetic_gdf.groupby(['date', 'region_id']).size()

date        region_id
2024-09-01  16.0         1
            18.0         1
            22.0         1
            25.0         1
            34.0         1
            35.0         1
            77.0         1
2024-09-02  4.0          1
            7.0          1
            19.0         1
            29.0         1
            34.0         1
            35.0         1
2024-09-03  2.0          1
            7.0          1
            13.0         1
            23.0         1
            25.0         1
            42.0         1
            52.0         1
2024-09-04  10.0         1
            23.0         1
            25.0         1
            30.0         1
            42.0         1
            52.0         1
            75.0         1
2024-09-05  24.0         2
            26.0         1
            33.0         1
            34.0         1
            78.0         1
2024-09-06  1.0          1
            14.0         1
            21.0         1
            41.0         1
      

## Discussion and models

![title](output/Cultivo_v4.png)

General concerns:
1. Time scale: 
Different research topics and use cases necessitate distinct time scales, which can range from hourly to annually. For instance, wildfires—an urgent issue in California—can rapidly degrade the environment, making hourly updates suitable. In contrast, pollution often requires a longer time scale to observe patterns effectively. Therefore, during the data flow process, data aggregation becomes essential.


2. Storage Considerations: 
Storage capacity is a critical factor, especially as research scales up. It is important to estimate storage requirements early in the process. For example, the datasets animals.csv and protected_areas.json tend to be static and will not increase significantly in size. Conversely, streamed images will demand substantial storage. For instance, if satellite images from SAT003 may require approximately 1 MB per image, resulting in a total storage requirement of around 8.5 GB per year (1 MB/image * 1 image/hour * 24 hours/day * 365 days/year). Similarly, images from wildlife cameras may require about 171 GB per year (1 MB/image * 20 images/hour * 24 hours/day * 365 days/year). In total, we might need approximately 200 GB of storage, though this figure may vary based on the limited data currently available. Monitoring these factors will be crucial for optimizing the system architecture.


3. Seasonality and Diurnal Patterns: 
Animal behavior varies with habitat and activity patterns throughout the year. For example, summer typically sees increased activity compared to winter. These seasonal variations can significantly impact model performance, underscoring the importance of having aggregated data that captures these temporal dynamics.


Data infrastructure:
1. Raw data: 
It is best practice to store all raw data in a data lake. This approach has two key advantages. First, it adds a layer of security, as it eliminates dependency on paid monitoring services, reducing the risk of data loss. Second, as new techniques and tools for data analysis emerge, storing raw data ensures the ability to revisit and extract insights with future advancements.


2. Dataflow: 
I’ve divided this process into two parts: tabular data and images. For tabular data, the standard workflow includes data formatting, error checking, and handling missing values. Next, data derivation is performed. In this specific task, I developed a function to generate different regions on a map, assigning corresponding region IDs. This helps in better aggregating results and facilitates statistical analyses, such as hotspot analysis. In the final stage, data aggregation is performed—for example, aggregating weekly data by region.

    Although this task did not involve image processing, I anticipate that satellite and wildlife camera images will require processing steps such as classification. For instance, I assume the file animal_events.csv represents derived data from wildlife cameras, which may involve image classification tasks. I've included considerations for image processing in the architecture, but I have not implemented any code for these functions due to a lack of detailed information.


3. BigQuery and further deployments:
The refined and aggregated dataset is stored in BigQuery, which allows team members to access the data using SQL, whether building models on the cloud or locally. BigQuery also integrates seamlessly with other GCP services, making it a highly efficient solution for data storage, query execution, and further deployments across cloud-based workflows. This setup ensures scalable, fast access to large datasets and enables effective collaboration across the team.


4. Visualization:
In this assignment, I utilized Plotly for visualizing the proposed model and its results. Plotly is specifically designed for interactive use; I found figures generated in an offline notebook may not display correctly. To facilitate ease of use and accessibility, I have embedded the generated figures directly within this document.



### Behavioral Analysis:

#### Determine when animals cross the boundaries of protected areas, Identify migration paths or clustering patterns.

To estimating animal activity, I propose two models for estimating animal activity areas:

1. Center Calculation Model:
This model calculates the center of all observed locations within a specified time period, serving as a proxy for the activity region. By assuming the animal distribution follows a Gaussian pattern, this model can provide insight into movement patterns. However, this assumption may introduce bias, particularly for animals with fewer observations, making it less reliable in such cases.


2. Region-Based Aggregation Model:
To address the limitations of the first model, particularly its sensitivity to outliers, aggregating the total number of observations within predefined regions may provide more accurate insights. By doing this, we can create a choropleth map and compare the distribution with the distance from protected areas.

The following two figures illustrate two distinct timeframes. Since the provided data only includes observations from September 1st, I generated synthetic data to demonstrate the validity of the model. This synthetic data was created by adding Gaussian noise to the original data from Day One, ensuring that it reflects realistic variability while still adhering to the underlying distribution of the observed values.

This approach helps validate the model's effectiveness and offers a clearer understanding of potential changes in animal populations over time. By visualizing both the actual and synthetic data, we can better assess trends and anomalies that may indicate risk zones for wildlife.

Both models require careful tuning of hyperparameters. For example, the design of the regions and the time intervals between observations are critical factors that need thoughtful consideration to ensure the accuracy of the results.

![title](output/0901.png)
![title](output/0902.png)

#### Analyze potential factors contributing to these crossings (e.g., time, weather, or environmental changes).

In addition to considering time and weather factors, here are several other elements to integrate into the data pipeline:

1. Land cover and usage, including human activities and pollution.
2. Forest information related to vegetation and biodiversity.
3. Water area data, such as lakes, rivers, and other water bodies.
4. Wildfire updates, crucial for areas prone to fire risks.
5. Soil characteristics, important for understanding ecosystem health.

These data sources offer valuable insights and can be integrated into the pipeline. Many of these can be sourced from open data platforms or paid services, such as Open Land Use or Global Surface Water Explorer, to enhance the analysis.

#### Calculate the total number of animal entries and exits from protected areas over time.

For megafauna and animals with larger body sizes, such as bison, satellite imagery can yield more precise estimates of their activity and range. Their larger physical size makes them more detectable through satellite-based observations. This data can be utilized to build a proxy model that estimates the overall population and activities by applying a correction factor that accounts for differences in observation density. However, it's important to note that this method introduces biases depending on the species being observed.

For smaller animals, a simpler and potentially more effective approach is to compare the number of observations in protected areas versus surrounding regions over two timeframes. The change in these observations, adjusted with a coefficient, can act as a proxy for estimating population trends. Region-based aggregation, which groups observations by geographical areas, can reveal useful trends, assuming that the observation rate remains consistent across both regions and timeframes.

### Advanced Insights:

#### Build a predictive model to anticipate future animal movements or identify risk zones for endangered species.

Given the lack of ground truth data for model calibration, one potential method for defining these risk zones involves calculating the total change in animal observations over two timeframes. A negative value in this context could signify that a region is at risk, suggesting a decline in the population.

There might be two scenarios:

1. Endangered Species Risk: In situations where one area shows a decrease in animal populations while a neighboring region experiences an increase, the area with declining numbers may not necessarily be at the highest risk for endangered species, especially if the total count remains stable. To quantitatively assess whether the populations are similar, hypothesis testing can be employed.

2. Risk Zone Assessment: Conversely, if both the measured area and its neighboring regions are experiencing declines, this scenario may indicate a higher risk zone.

This problem can be framed as both a classification and regression challenge. I would prefer a regression model to predict future observation counts, facilitating the identification of regions that show significant drops in animal numbers. However, a classification model could also categorize regions as "at risk" based on observed declines.

Input Features: The model should incorporate various input features, including animal characteristics, geographical data, demographic information, weather conditions, and time variables.

Model Training: Given that this is time series data, I would employ stratified sampling to separate the data into training (85%) and testing (15%) sets, ensuring temporal separation to prevent data leakage. For evaluating model performance, the mean square error (MSE) metric would be suitable, as it effectively gauges accuracy in predicting continuous values.
