# Data Engineering Take-Home Assignment: Nature Conservation & Geospatial Data

## Context
Assume you have been hired as a Data Engineer for an organization focused on nature conservation. The organization is working on a project to monitor and protect natural habitats using satellite data, wildlife sensor data, and geospatial information. Your task is to design and implement a data pipeline that ingests, processes, and analyzes this data to help identify areas needing immediate conservation attention as well as build a model that provides helpful insights related our organization's interests.

## Objective 

Your goal in this assessment is to showcase your curiousity and creativity to design rigorous models and derive interesting insights.  

You'll be given two tasks.

The first is a design task, in which we expect you to diagram and describe how you'd set up a process to injest this data from a live streamed source, assuming you are also paying montoring services to supply this data from scratch. Think about how you might transform and store the data efficiently for querying and analysis and feed it into your model. 

The second task will require you devise interesting questions from preliminary explorations of a subset of migration data, found alongside this notebook, and construct a rigorous model to answer them. Please demonstrate all of your process using this notebook, and most importantly your outputs. 




## Tasks

### 1) Design - Data Ingestion & Storage:
- **Ingestion**: Design and implement a solution to ingest data from three different sources: GeoJSON, CSV, and JSON.
- **Automation**: Ensure the pipeline can handle regular data updates (e.g., daily or hourly).
- **Storage**: Choose appropriate storage solutions for each dataset (e.g., relational database, NoSQL, cloud storage, or data lake). Provide justification for your choices.

### 2) Data Transformation & Analysis:
- **Data Parsing & Cleaning**: 
  - Parse and clean the wildlife tracking data (CSV) and geospatial data (GeoJSON) to ensure consistency.
  - Ensure the data is ready for analysis by standardizing formats, removing errors, and handling missing values.

- **Exploratory Data Analysis**:
  - Investigate the data to understand key characteristics, distributions, and trends.

- **Behavioral Analysis**:
  - Identify more complex animal behaviors:
    - Determine when animals cross the boundaries of protected areas.
    - Analyze potential factors contributing to these crossings (e.g., time, weather, or environmental changes).
    - Calculate the total number of animal entries and exits from protected areas over time.

- **Advanced Insights**:
  - Identify migration paths or clustering patterns.
  - Build a predictive model to anticipate future animal movements or identify risk zones for endangered species.

### 3) Optional Bonus - Visualization/Reporting:
- Provide interactive visualizations to demonstrate your analysis, ideally within this notebook.

### Here are data sources you can use to build your analysis. 

- https://storage.googleapis.com/data-science-assessment/animal_events.csv
- https://storage.googleapis.com/data-science-assessment/animals.csv
- https://storage.googleapis.com/data-science-assessment/protected_areas.json
- https://storage.googleapis.com/data-science-assessment/satellites.json

## Deliverables
#### Design component:
- A clear description and diagrams for the architecture and tools you might used, including any cloud services, databases, or libraries (if applicable). During the discussion we'll go over different scenarios. 

#### Implementation:
- Code for the data pipeline that includes:
  - Data ingestion scripts or setup.
  - Transformation and processing logic.
  - Queries or outputs showcasing the results.
- (Optional) a visualization of the results.

## Data
### 1. **Animal Events - CSV** [Download link](https://storage.googleapis.com/data-science-assessment/animal_events.csv)

- Contains data on animal movement events with details like location and speed.
- **Key Columns**: `event_id`, `animal_id`, `timestamp`, `latitude`, `longitude`, `speed`.

---

### 2. **Animals - CSV** [Download link](https://storage.googleapis.com/data-science-assessment/animals.csv)

- Metadata about tracked animals, including species and conservation status.
- **Key Columns**: `animal_id`, `species`, `endangered`, `animal_type`, `preferred_landcover`.

---

### 3. **Protected Areas - GeoJSON** [Download link](https://storage.googleapis.com/data-science-assessment/protected_areas.json)

- Geospatial data representing protected areas with boundaries and metadata.
- **Key Fields**: `name`, `category`, `protected_area_id`, `geometry`.

---

### 4. **Satellite Metadata - JSON** [Download link](https://storage.googleapis.com/data-science-assessment/satellites.json)

- Metadata from satellite imagery, covering factors like cloud cover and resolution.
- **Key Fields**: `satellite_id`, `start_time`, `last_time`, `frequency`, `bounding_box`, `cloud_cover_percentage`, `resolution`.

---

## Evaluation Criteria

- **Data Engineering Skills**: How well the pipeline handles ingestion, transformation, and storage.
- **Geospatial Data Handling**: Ability to process geospatial data and perform spatial operations (e.g., joins, intersections).
- **Scalability & Efficiency**: The pipeline’s ability to handle larger datasets or more frequent updates.
- **Code Quality**: Structure, readability, and use of best practices.
- **Documentation**: Clear explanations of your approach and any assumptions made.
- **Bonus (Visualization/Reporting)**: Extra points for insightful data visualization or reporting.

## Set up

Feel free to set up this notebook using condo, or your own kernal / virtual environment. To make it easier, you can set up the notebook using this docker with the potentialy libraries you might need. 

#### To start using a prepared Docker image, 
- 1 navigate to this shared folder in your terminal, and then load up docker and run the docker file to pull in needed libraries

```bash
docker build -t geospatial-notebook .
docker run -p 8888:8888 -v $(pwd):/home/nobody/work geospatial-notebook
```


When the container runs, it will display a URL with a token (something like http://127.0.0.1:8888/?token=...). It will probably be something like http://127.0.0.1:8888/tree You can copy this URL into your browser, and you'll open to a Jupyter lab. Your existing notebook will be available inside the container under the work directory.

Anytime you want to work again, just run the following command to start the Docker container and access your notebooks:

```bash
docker run -p 8888:8888 -v $(pwd):/home/nobody/work geospatial-notebook
```


In [1]:
# Libraries you may or may not need
import pandas as pd
import geopandas as gpd
import shapely
import sqlalchemy
import psycopg2
import osgeo.gdal

### load the needed datapoints


### Before we get started, I'll explore what we have

In [2]:
import os
os.listdir("data")

['satellites.json', 'animal_events.csv', 'protected_areas.json', 'animals.csv']

In [3]:
# Read in Animal 
animals = pd.read_csv("data/animals.csv")
animals

Unnamed: 0,animal_id,common_name,scientific_name,redlist_cat,megafauna
0,A001,Wolf,Canis lupus,Least Concern,no
1,A002,Bison,Bison bison,Vulnerable,yes
2,A003,Elk,Cervus canadensis,Least Concern,yes
3,A004,Sierra Nevada bighorn sheep,Ovis canadensis sierrae,Endangered,no
4,A005,Sierra Nevada red fox,Vulpes vulpes necator,Critically Endangered,no
5,A006,Bobcat,Lynx rufus,Least Concern,yes
6,A007,Mule deer,Odocoileus hemionus,Least Concern,yes
7,A008,Desert bighorn sheep,Ovis canadensis nelsoni,Near Threatened,yes
8,A009,Gray fox,Urocyon cinereoargenteus,Least Concern,yes


In [4]:
# Read in Animal Events
animal_events = pd.read_csv("data/animal_events.csv")
animal_events.head()

#event_id and speed missing from this data.

Unnamed: 0,animal_id,timestamp,latitude,longitude
0,A001,2024-09-01 12:00:00,45.2284,-110.7622
1,A002,2024-09-01 12:00:00,44.576,-110.6763
2,A003,2024-09-01 12:00:00,44.4232,-111.1061
3,A004,2024-09-01 12:00:00,37.9058,-119.7857
4,A005,2024-09-01 12:00:00,37.7896,-119.6426


In [5]:
#Read in Protected Areas
pa = gpd.read_file("data/protected_areas.json")
#pa.is_valid
pa

Unnamed: 0,name,category,protected_area_id,geometry
0,Yellowstone National Park,National Park,PA001,"POLYGON ((-110.839 44.4488, -110.7052 44.599, ..."
1,Yosemite National Park,National Park,PA002,"POLYGON ((-119.655 37.7244, -119.5964 37.6962,..."
2,Grand Canyon National Park,National Park,PA003,"POLYGON ((-112.1861 36.1336, -112.2156 36.2331..."


In [6]:
#Read in Satellite Metadata
sm = pd.read_json("data/satellites.json")
sm

Unnamed: 0,satellite_id,start_time,last_time,frequency,bounding_box,cloud_cover_percentage,resolution
0,SAT001,2018-09-01 12:00:00+00:00,2024-09-10 12:00:00+00:00,daily,"{'xmin': -112.939131, 'ymin': 42.596356, 'xmax...",12.5,10m
1,SAT002,2004-09-01 12:00:00+00:00,2024-09-06 12:00:00+00:00,bi-weekly,"{'xmin': -180, 'ymin': -90, 'xmax': 180, 'ymax...",10.0,100m
2,SAT003,2022-09-01 12:00:00+00:00,2024-09-10 12:00:00+00:00,hourly,"{'xmin': -124.178099, 'ymin': 30.738207, 'xmax...",10.0,20m


In [7]:
sm.bounding_box.values[0]

{'xmin': -112.939131,
 'ymin': 42.596356,
 'xmax': -107.048726,
 'ymax': 46.142424}

In [8]:
xmins = [x['xmin'] for x in sm.bounding_box.values]
ymins = [x['ymin'] for x in sm.bounding_box.values]
xmaxs = [x['xmax'] for x in sm.bounding_box.values]
ymaxs = [x['ymax'] for x in sm.bounding_box.values]

In [9]:
from shapely.geometry import box
geometry = [box(x1, y1, x2, y2) for x1,y1,x2,y2 in zip(xmins, ymins, xmaxs, ymaxs)]
#Building geometries from the bounding boxes - this can help us find overlap between animal_events and satellite imagery.
sm = gpd.GeoDataFrame(sm, geometry=geometry)



# 1. Design - Data Ingestion & Storage

#### `animal_events`

##### 1. Batch Process animal events and write/upload to S3 `raw-animal-events`.   These will be streamed from the animal trackers and may or may not have service at any given point in time.  For most use cases, we won't need live data.  

**One case where live data may be useful is if a protected area closely neighbors ranch/livestock.  There may be cases where we would want to notify local wildlife departments if for example wolves leave a protected area and are approaching a cattle ranch.  Human intervention to push them back into the park may be beneficial.  

##### 2. Either Hourly/Daily (depending on how much data we are actually uploading) - take all unprocessed files except the most recent file (this may be actively being written to so we don't want to screw this up), clean and validate the data then update `animal_events` database with new entries.  

`animal_id` should already exist and be part of a known list of tracked animals. 

timestamp should be in a standardized format and cleaned.

lat and longitude should have 6 decimal places for as accurate locations as possible.  Our current dataset only has 4 so that's fine for now.

##### 3.  Once new entries are confirmed in the animal_events database, mark file as processed or move to processed bucket in S3.

This can be automated in Apache Airflow, Dagster (what I worked with most recently), etc.

In the case where live data is useful for deterring wildlife, anytime new data comes in, we could set a trigger when the file is changed to check the most recent data against protected areas. 


In [31]:
# Note that I did not set up the databases for this exercise at this time, 
# but if you'd like me to I can go back and do that.  

def validate_animal_id_exists(animal_id: str, conn_str: str) -> bool:
    """
    Check the animals table to sure the incoming animal_id from the
    animal event is valid.

    animal_id (str): animal_id of the event that should exist and join to the animals table
    conn_str (str): the connection string to the database
    """
    
    conn = psycopg2.connect(conn_str)

    QUERY = """SELECT EXISTS(SELECT 1 FROM animals WHERE animal_id=(%s)"""

    with conn:
        with conn.cursor() as curs:
            curs.execute(QUERY, (animal_id,))
            animal_id_exists = curs.fetchone()[0]
    # leaving contexts doesn't close the connection
    conn.close()
    return animal_id_exists

#### `protected_areas`
##### 1. These files/geojsons will be processed on upload so we'll need a trigger event.  We can dump these into s3 and as soon as a file is updated, we can run the geojson through the geojson validator to make sure the coordinate system and the geojson is standardized and accurate.  This will only need to be done once and likely won't be triggered often since there will be a finite number of protected areas.



In [33]:
import geojson_validator

def validate_geojson(geojson_input: dict) -> dict:
    """
    An example of geojson validator we can use. Or, we can use geopandas `is_valid`
    """
    geojson_validator.validate_structure(geojson_input, check_crs=True)
    geojson_validator.validate_geometries(geojson_input)
    geojson_output = geojson_validator.fix_geometries(geojson_input)
    return geojson_output

In [12]:
# import json
# from jsonschema import validate

##### 

# 2. Data Transformation & Analysis

## Data Parsing and Cleaning

In [13]:
DEFAULT_CRS = 4326

In [34]:
def convert_dataframe_to_geodataframe(df: pd.DataFrame, lat_col: str = 'latitude', lon_col: str = 'longitude') -> gpd.GeoDataFrame:
    """
    Takes a datafarme with a latitude value and a longitude value and convert it to a geodataframe.  
    This makes geospatial operations easier.  

    df (pd.DataFrame): dataframe with a latitude value and a longitude value
    lat_col (str): name of the latitude column
    lon_col (str): name of the longitude column

    Returns a geodataframe with a geometry column.
    """
    gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df[lon_col], df[lat_col]), crs=DEFAULT_CRS)
    del gdf['latitude']
    del gdf['longitude']
    return gdf

from datetime import datetime
def parse_timestamp(datetime_str:str)->datetime:
    return datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')

### Animal Events

In [15]:
animal_events = pd.read_csv("data/animal_events.csv")
animal_events = convert_dataframe_to_geodataframe(animal_events)
animal_events['timestamp'] = animal_events['timestamp'].map(lambda x: parse_timestamp(x))

### Protected Areas

In [16]:
protected_areas = gpd.read_file("data/protected_areas.json")
protected_areas.set_crs(DEFAULT_CRS, inplace=True)

Unnamed: 0,name,category,protected_area_id,geometry
0,Yellowstone National Park,National Park,PA001,"POLYGON ((-110.839 44.4488, -110.7052 44.599, ..."
1,Yosemite National Park,National Park,PA002,"POLYGON ((-119.655 37.7244, -119.5964 37.6962,..."
2,Grand Canyon National Park,National Park,PA003,"POLYGON ((-112.1861 36.1336, -112.2156 36.2331..."


## Exploratory Data Analysis

In [35]:
# We'll want to identify when animals enter and exit protected areas of the park.  
# Animals that live in Yellowstone will most likely never naturally enter a park in Yosemite or Grand Canyon (although it's possible).  
# For this purpose we will see if an individual is in any of our protected areas, but we can implement more intelligent logic to only check 
# in high probability areas/neighboring areas. 

# Note: looks like the protected area in Yellowstone is only a portion of the actual Yellowstone boundaries.  For this purpose we will assume
# that the protected area is just the sub section.  

In [36]:
from shapely.geometry import Point, Polygon
def in_protected_area(location: gpd.GeoDataFrame, protected_area: Polygon) -> bool:
    """
    Takes a geodataframe containing Points (lat/long) and a Polygon and returns True/False for each
    Point if the point is within the given polygon
    """
    return protected_area.contains(location)

In [37]:
# Let's look at movement of a tagged wolf.  The timestamp is color coded so the lighter/more white is earlier and the point gets darker
# as the animal moves.  This allows us to track direction.
animal1 = animal_events[animal_events['animal_id'] =='A001']
animal_map = animal1.explore(column='timestamp',cmap="binary",style_kwds={"style_function":lambda x: {"radius":5}})
yellowstone = protected_areas[protected_areas['name']=='Yellowstone National Park']
yellowstone.explore(m=animal_map)

In [38]:
### All Animals
animal_map = animal_events.explore(column='animal_id',style_kwds={"style_function":lambda x: {"radius":5}})
pa.explore(m=animal_map)

### Find Animals Entering and Exiting Protected Areas

In [39]:
# these aren't complex geometries so this works for now.  
# We'll want to find the most likely protected areas per each species/population to run this analysis on.  
union_protected_area = protected_areas.union_all()

In [22]:
animal_events = animal_events.sort_values(['animal_id','timestamp'])
animal_events['protected_area'] = animal_events['geometry'].map(lambda x: in_protected_area(x, union_protected_area))
#diff will track when an animal moves from within protected area boundaries.  If the value is 0, the animal hasn't changed location category,
#but if the value is  +1, the animal has entered a protected area and if the value is -1 then the animal has left the protected area.
animal_events['changed_area'] = animal_events[['animal_id', 'protected_area']].groupby('animal_id').diff()

In [23]:
animals_entering_pa = animal_events[animal_events['changed_area']==1]
animals_exiting_pa = animal_events[animal_events['changed_area']==-1]

In [24]:
animals_entering_pa
# 2 animals entering the protected area during these couple hours

Unnamed: 0,animal_id,timestamp,geometry,protected_area,changed_area
21,A004,2024-09-01 14:00:00,POINT (-119.5353 37.8601),True,1
24,A007,2024-09-01 14:00:00,POINT (-112.1831 36.2261),True,1


In [25]:
animals_exiting_pa
# 1 animal exiting the protected area during these couple hours

Unnamed: 0,animal_id,timestamp,geometry,protected_area,changed_area
19,A002,2024-09-01 14:00:00,POINT (-110.8182 44.2221),False,-1


In [26]:
#For tracking animal behavior, we'll need more observations to understanding daily and seasonal behaviors.  
#For each animal we only have 3 timestamps so it's difficult to determine overall behavior pattern.   Additionally, we'll need to consider
#specific species behaviors.  We know wolf packs have very specific territorities so we can't apply a model built for Wolf Pack 1 on any
#wolves in Wolf Pack 2 since their territories minimally overlap.  




### Compute Speed 

In [27]:
#Speed was not included in our raw data file, but we can compute it based on the animal event information we have.  W
CRS_FOR_METERS = 3763
animal_events.to_crs(CRS_FOR_METERS, inplace=True)

In [28]:
def compute_distance(point1: Point, point2: Point)->float:
    """ Takes two points and returns the distance
    """
    return point1.distance(point2)

In [41]:
from datetime import timedelta
def compute_distance_traveled_and_speed (df:gpd.GeoDataFrame)->gpd.GeoDataFrame:
    """
    Computes the distance traveled between each timestamp and the average speed during
    that time interval.  
    """
    nanoseconds_in_hour = 3600000000000
    meters_per_kilometer = 1000
    timestamps = df['timestamp'].values

    
    df['time_delta'] = [-1] + [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)]
    df['time_delta_hours'] = df['time_delta'].map(lambda x: int(x/nanoseconds_in_hour)) # nanoseconds to hours
    locations = df.geometry.values
    df['distance'] = [-1] + [compute_distance(locations[i+1],locations[i]) for i in range(len(timestamps)-1)]
    df['speed'] = df['distance']/df['time_delta_hours']/meters_per_kilometer #kph

    del df['time_delta']
    del df['animal_id']
    return df
    

animal_events = animal_events.groupby('animal_id').apply(lambda x: compute_distance_traveled_and_speed(x)).reset_index()
del animal_events['level_1']

  animal_events = animal_events.groupby('animal_id').apply(lambda x: compute_distance_traveled_and_speed(x)).reset_index()


In [44]:
animal_events

# 128kph for wolf A001 seems way too fast so this could potentially be suspcious and something we'd want to look into.  -inf we can ignore 
# since this value is equivalent to null.

# We can use this speed combined with the direction of the animal to determine where they may go next.  Note that if the animal is moving really
# fast for a prolonged period of time, they most likely will slow down rather than continuing at that velocity.  

# Based on the average speed, we could estimate the time an animal enters and exists the park.

Unnamed: 0,animal_id,timestamp,geometry,protected_area,changed_area,time_delta_hours,distance,speed
0,A001,2024-09-01 12:00:00,POINT (-5381319.455 6971999.452),False,,0,-1.0,-inf
1,A001,2024-09-01 13:00:00,POINT (-5501549.44 7017045.055),False,0.0,1,128391.416045,128.391416
2,A001,2024-09-01 14:00:00,POINT (-5520558.842 6940376.552),False,0.0,1,78989.979017,78.989979
3,A002,2024-09-01 12:00:00,POINT (-5479712.708 6993193.071),True,,0,-1.0,-inf
4,A002,2024-09-01 13:00:00,POINT (-5484538.472 6998369.324),True,0.0,1,7076.834453,7.076834
5,A002,2024-09-01 14:00:00,POINT (-5527126.909 7024840.613),False,-1.0,1,50144.831441,50.144831
6,A003,2024-09-01 12:00:00,POINT (-5487615.326 7045660.662),False,,0,-1.0,-inf
7,A003,2024-09-01 13:00:00,POINT (-5480175.792 7006429.161),False,0.0,1,39930.656716,39.930657
8,A003,2024-09-01 14:00:00,POINT (-5473000.239 7026372.49),False,0.0,1,21194.927022,21.194927
9,A004,2024-09-01 12:00:00,POINT (-5974333.858 8429832.313),False,,0,-1.0,-inf
