In [None]:
!pip install earthengine-api #earth-engine Python API



In [None]:
!earthengine authenticate

To authorize access needed by Earth Engine, open the following URL in a web browser and follow the instructions. If the web browser does not start automatically, please manually browse the URL below.

    https://code.earthengine.google.com/client-auth?scopes=https%3A//www.googleapis.com/auth/earthengine%20https%3A//www.googleapis.com/auth/devstorage.full_control&request_id=p3zO0-FjxZYXcYXi-nBO_VNC2_BzSCuqswE0SUKjEHw&tc=3uJECtAYTLSKf0N1ekow_fVgwKCyvspMis15ELTE6I4&cc=N3TpULXl6_0dHQTMqhPaMAnurm3rMr1oLrKTzHUCpbw

The authorization workflow will generate a code, which you should paste in the box below.
Enter verification code: 4/1AfJohXnTjefF2NVjKPhbcP10Ax1bZw0tun1AWkGJZPnX1XaCi52gJJfM6zI

Successfully saved authorization token.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
from geopandas import geopandas
from shapely.geometry import LineString
import json
import ee
import numpy as np
ee.Initialize()
road_df = geopandas.read_file("/content/drive/MyDrive/bt_roads.json")
geo_json = road_df.to_json()
elevation = ee.Image("USGS/SRTMGL1_003")
road_df["lineString"] = road_df.apply(lambda r: LineString(r.geometry.geoms[0].coords), axis=1)

In [None]:
def calculate_angle(p0, p1, p2):
    a = np.array(p0)
    b = np.array(p1)
    c = np.array(p2)

    ba = a - b
    bc = c - b

    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))

    # Clamp the value to the domain of arccos
    cosine_angle = np.clip(cosine_angle, -1, 1)

    angle = np.arccos(cosine_angle)

    return np.degrees(angle)

def calculate_slope(cur_coords):
  ee_multipolygon = ee.Geometry.MultiPolygon(LineString(cur_coords).buffer(0.0005).__geo_interface__["coordinates"])

  # Calculate the average elevation within the MultiPolygon
  elevation_stats = elevation.reduceRegion(
      reducer=ee.Reducer.mean(),
      geometry=ee_multipolygon,
      scale=30,  # Specify the scale (in meters)
  )

  # Calculate the slope within the MultiPolygon
  slope = ee.Terrain.slope(elevation)
  slope_stats = slope.reduceRegion(
      reducer=ee.Reducer.mean(),
      geometry=ee_multipolygon,
      scale=30,  # Specify the scale (in meters)
  )

  average_slope = round(slope_stats.get("slope").getInfo()  , 1)
  # Clip elevation data to the country boundary
  clipped_elevation = elevation.clip(ee_multipolygon)

  # Calculate slope
  slope = ee.Terrain.slope(clipped_elevation)

  # Define a threshold for slope (40 degrees)
  slope_threshold = 0

  # Create a binary image where slope > 40 is 1, else 0
  slope_gt_threshold = slope.gt(slope_threshold)

  # Check if there are any pixels with slope > 40
  any_pixels_gt_40 = slope_gt_threshold.reduceRegion(
      reducer=ee.Reducer.mean(),
      geometry=ee_multipolygon,
      scale=30,  # Adjust scale as needed
      maxPixels=1e9  # Adjust maxPixels as needed
  )

  # Print the result
  average_slope = round(slope_stats.get("slope").getInfo()  , 1)

  return average_slope


# def calculate_slope(cur_coords):
#   ee_multipolygon = ee.Geometry.MultiPolygon(LineString(cur_coords).buffer(0.005, single_sided=True).__geo_interface__["coordinates"])
#   ee_multipolygon_1 = ee.Geometry.MultiPolygon(LineString(cur_coords).buffer(-0.005, single_sided=True).__geo_interface__["coordinates"])

#   # Calculate the average elevation within the MultiPolygon
#   elevation_stats = elevation.reduceRegion(
#       reducer=ee.Reducer.mean(),
#       geometry=ee_multipolygon,
#       scale=30,  # Specify the scale (in meters)
#   )

#   elevation_stats_1 = elevation.reduceRegion(
#       reducer=ee.Reducer.mean(),
#       geometry=ee_multipolygon_1,
#       scale=30,  # Specify the scale (in meters)
#   )

#   if elevation_stats_1.get("elevation").getInfo() > elevation_stats.get("elevation").getInfo():

#     elevation_stats = elevation_stats_1
#     ee_multipolygon = ee_multipolygon_1

#   # Calculate the slope within the MultiPolygon
#   slope = ee.Terrain.slope(elevation)
#   slope_stats = slope.reduceRegion(
#       reducer=ee.Reducer.mean(),
#       geometry=ee_multipolygon,
#       scale=30,  # Specify the scale (in meters)
#   )

  # average_slope = round(slope_stats.get("slope").getInfo()  , 1)
  # if average_slope > 40:
  #   print(average_slope)

  # return average_slope


In [None]:
landslide_marker = {}
visited_slopes = []
from tqdm import tqdm


for lstring in tqdm(road_df["lineString"]):
  coords = lstring.coords
  i = 1
  for i in range(0, len(coords), 50):
    try:
      slope = calculate_slope(coords[i: i+50])
      print(slope)
    except:
      continue
    if slope > 25:

      visited_slopes.append(coords[i])
      landslide_marker[str(coords[i: i+50])] = slope
      break


In [None]:
import json

with open('/content/drive/MyDrive/bt_slopes.json', 'w') as file:
    json.dump(ls_markers, file)

curvy_marker_str_keys = {str(key): value for key, value in landslide_marker.items()}

with open('/content/drive/MyDrive/bt_curves_slopes.json', 'w') as file:
    json.dump(curvy_marker_str_keys, file)

In [None]:
ls_markers = []
for i in visited_slopes:
  item = {}
  item["long"] = i[0]
  item["lat"] = i[1]

  ls_markers.append(item)

In [None]:
curvy_marker = {}
visited_corners = []
from tqdm import tqdm

def get_route(currCoords, i):
  curr_path = []
  for i in range(i, len(coords)-1):
    p0 = coords[i-1]
    p1 = coords[i]
    p2 = coords[i+1]
    angle = calculate_angle(p0, p1, p2)
    if 40 <= angle <= 140:
      curr_path.append(list(p1))
      curvy_marker[p1] = angle
    else:
      return curr_path, i
  return curr_path, i


for lstring in tqdm(road_df["lineString"]):
  coords = lstring.coords
  i = 1
  while i < len(coords)-1:
    path, i = get_route(coords, i)
    if len(path) > 0:
      visited_corners.append(path)
      i +=50
    i+=1

In [None]:
for lstring in tqdm(road_df["lineString"]):
  j = lstring
  break

In [None]:
import json

with open('/content/drive/MyDrive/bt_curves.json', 'w') as file:
    json.dump(curved_markers, file)

curvy_marker_str_keys = {str(key): value for key, value in curvy_marker.items()}

with open('/content/drive/MyDrive/bt_curves_angle.json', 'w') as file:
    json.dump(curvy_marker_str_keys, file)

In [None]:
curved_markers = []
for i in visited_corners:
  item = {}
  item["long"] = i[0][0]
  item["lat"] = i[0][1]
  item["markerType"] = "curveMarker"

  curved_markers.append(item)

In [None]:
import ee

# Initialize Earth Engine
ee.Initialize()

# Load elevation data (SRTM) and country boundary
elevation = ee.Image("USGS/SRTMGL1_003")
ee_multipolygon = ee.Geometry.MultiPolygon(j.buffer(0.0005).__geo_interface__["coordinates"])

# Clip elevation data to the country boundary
clipped_elevation = elevation.clip(ee_multipolygon)

# Calculate slope
slope = ee.Terrain.slope(clipped_elevation)

# Define a threshold for slope (40 degrees)
slope_threshold = 0

# Create a binary image where slope > 40 is 1, else 0
slope_gt_threshold = slope.gt(slope_threshold)

# Check if there are any pixels with slope > 40
any_pixels_gt_40 = slope_gt_threshold.reduceRegion(
    reducer=ee.Reducer.anyNonZero(),
    geometry=ee_multipolygon,
    scale=30,  # Adjust scale as needed
    maxPixels=1e9  # Adjust maxPixels as needed
)

# Print the result
print("Are there any pixels with slope > 40?", any_pixels_gt_40.get('slope').getInfo())
