In [None]:
!pip install requests tqdm geopandas rasterio

In [1]:
# local dev env cruft
import sys
sys.path.insert(0, '/Users/sfitch/Coding/earthai/src/main/python/')

In [2]:
from earthai import *
from pyrasterframes.rasterfunctions import *
import geomesa_pyspark.types
from earthai import earth_ondemand

import pyrasterframes
# spark = pyrasterframes.get_spark_session()
from pyspark.sql.functions import lit, rand, when, col, array
from pyspark.sql import SparkSession
from pyrasterframes import utils

spark = SparkSession.builder \
            .master('local[*]') \
            .config('spark.driver.memory', '12g') \
            .config('spark.jars', pyrasterframes.utils.find_pyrasterframes_assembly()) \
            .config('spark.serializer',	'org.apache.spark.serializer.KryoSerializer') \
            .config('spark.kryoserializer.buffer.max', '2047m') \
            .getOrCreate() 
spark.withRasterFrames()

# LandSat Crop Classification Model

### Pull Landsat8 from EOD

In [9]:
eod = earth_ondemand.read_catalog(
    geo=[-97.1, 47.4, -97.08, 47.5],
    max_cloud_cover=10,
    collections='landsat8_l1tp',
    start_datetime='2018-07-01T00:00:00',
    end_datetime='2018-08-31T23:59:59'
)

100%|██████████| 3/3 [00:00<00:00, 16.33it/s]


In [10]:
scene_df = eod[eod.eod_grid_id == "WRS2-030027"]

In [11]:
print(len(scene_df))
teh_scene = scene_df.iloc[0].B4
teh_scene

1


'https://landsat-pds.s3.us-west-2.amazonaws.com/c1/L8/030/027/LC08_L1TP_030027_20180717_20180730_01_T1/LC08_L1TP_030027_20180717_20180730_01_T1_B4.TIF'

### Munge crop target

```bash


aws s3 cp s3://s22s-sanda/sar-crop/target/scene_30_27_target.tif /tmp

gdalinfo /vsicurl/https://landsat-pds.s3.us-west-2.amazonaws.com/c1/L8/030/027/LC08_L1TP_030027_20180717_20180730_01_T1/LC08_L1TP_030027_20180717_20180730_01_T1_B4.TIF

gdalwarp -t_srs "+proj=utm +zone=14 +datum=WGS84 +units=m +no_defs " \
    -te  528885.000 5138685.000 760815.000 5373915.000 \
    -te_srs "+proj=utm +zone=14 +datum=WGS84 +units=m +no_defs " \
    -tr 30.0 30.0 \
    -co TILED=YES -co COPY_SRC_OVERVIEWS=YES -co COMPRESS=DEFLATE \
    scene_30_27_target.tif scene_30_27_target_utm.tif
 
```

## Create and Read Raster Catalogue

In [13]:
scene_df['target'] = 'file:///tmp/scene_30_27_target_utm.tif'
scene_df

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


Unnamed: 0,eod_collection_display_name,eod_collection_family,eod_collection_family_display_name,eod_grid_id,created,datetime,eo_cloud_cover,eo_constellation,eo_epsg,eo_gsd,...,B2,BQA,B4,B1,B8,B11,collection,geometry,id,target
0,Landsat 8,landsat8,Landsat 8,WRS2-030027,2019-08-19T20:54:33.413548Z,2018-07-17T17:15:57.1536740Z,1.49,landsat-8,32614,30.0,...,https://landsat-pds.s3.us-west-2.amazonaws.com...,https://landsat-pds.s3.us-west-2.amazonaws.com...,https://landsat-pds.s3.us-west-2.amazonaws.com...,https://landsat-pds.s3.us-west-2.amazonaws.com...,https://landsat-pds.s3.us-west-2.amazonaws.com...,https://landsat-pds.s3.us-west-2.amazonaws.com...,landsat8_l1tp,(POLYGON ((-98.62404379679178 46.4012557977134...,LC08_L1TP_030027_20180717_20180730_01_T1_L1TP,file:///tmp/scene_30_27_target_utm.tif


In [15]:
features_rf = spark.read.raster(    
    catalog=scene_df,
    catalog_col_names=['B1', 'B2', 'B3', 'B4', 'B5', 'B6', 'B7', 'BQA', 'target'],
    tile_dimensions=(256, 256)
).repartition(200)

### Feature creation

In [16]:
features_rf = features_rf.withColumn('ndvi', rf_normalized_difference(features_rf.B5, features_rf.B4)) \
       .withColumn('ndwi1', rf_normalized_difference(features_rf.B5, features_rf.B6)) \
       .withColumn('ndwi2', rf_normalized_difference(features_rf.B5, features_rf.B7)) \
       .withColumn('ndwi3', rf_normalized_difference(features_rf.B3, features_rf.B5)) \
       .withColumn('evi', rf_local_multiply(rf_local_divide(rf_local_subtract(features_rf.B5, features_rf.B4), rf_local_add(rf_local_subtract(rf_local_add(features_rf.B5, rf_local_multiply(features_rf.B4, lit(6.0))), rf_local_multiply(features_rf.B2, lit(7.5))), lit(1.0))), lit(2.5))) \
       .withColumn('savi', rf_local_multiply(rf_local_divide(rf_local_subtract(features_rf.B5, features_rf.B4), rf_local_add(rf_local_add(features_rf.B5, features_rf.B4), lit(0.5))), lit(1.5))) \
       .withColumn('osavi', rf_local_divide(rf_local_subtract(features_rf.B5, features_rf.B4), rf_local_add(rf_local_add(features_rf.B5, features_rf.B4), lit(0.16)))) \
       .withColumn('satvi', rf_local_subtract(rf_local_multiply(rf_local_divide(rf_local_subtract(features_rf.B6, features_rf.B4),rf_local_add(rf_local_add(features_rf.B6, features_rf.B4), lit(0.5))), lit(1.5)), rf_local_divide(features_rf.B7, lit(2.0)))) \
       .withColumn('mean_swir', rf_local_divide(rf_local_add(features_rf.B6, features_rf.B7), lit(2.0))) \
       .withColumn('vli', rf_local_divide(rf_local_add(rf_local_add(rf_local_add(features_rf.B1, features_rf.B2), features_rf.B3), features_rf.B4), lit(4.0))) \
       .withColumn('dbsi', rf_local_subtract(rf_normalized_difference(features_rf.B6, features_rf.B3), rf_normalized_difference(features_rf.B5, features_rf.B4)))

features_rf = features_rf.select(
    features_rf.target,
    rf_crs(features_rf.B1).alias('crs'),
    rf_extent(features_rf.B1).alias('extent'),
    rf_tile(features_rf.B1).alias('coastal'),
    rf_tile(features_rf.B2).alias('blue'),
    rf_tile(features_rf.B3).alias('green'),
    rf_tile(features_rf.B4).alias('red'),
    rf_tile(features_rf.B5).alias('nir'),
    rf_tile(features_rf.B6).alias('swir1'),
    rf_tile(features_rf.B7).alias('swir2'),
    rf_tile(features_rf.ndvi).alias('ndvi'),
    rf_tile(features_rf.ndwi1).alias('ndwi1'),
    rf_tile(features_rf.ndwi2).alias('ndwi2'),
    rf_tile(features_rf.ndwi3).alias('ndwi3'),
    rf_tile(features_rf.evi).alias('evi'),
    rf_tile(features_rf.savi).alias('savi'),
    rf_tile(features_rf.osavi).alias('osavi'),
    rf_tile(features_rf.satvi).alias('satvi'),
    rf_tile(features_rf.mean_swir).alias('mean_swir'),
    rf_tile(features_rf.vli).alias('vli'),
    rf_tile(features_rf.dbsi).alias('dbsi'),
    rf_tile(features_rf.BQA).alias('qa')
)

In [17]:
# Values of qa band indicating cloudy conditions
cloud = [2800, 2804, 2808, 2812, 6896, 6900, 6904, 6908]

mask_part = features_rf \
    .withColumn('cloud1', rf_local_equal('qa', lit(2800))) \
    .withColumn('cloud2', rf_local_equal('qa', lit(2804))) \
    .withColumn('cloud3', rf_local_equal('qa', lit(2808))) \
    .withColumn('cloud4', rf_local_equal('qa', lit(2812))) \
    .withColumn('cloud5', rf_local_equal('qa', lit(6896))) \
    .withColumn('cloud6', rf_local_equal('qa', lit(6900))) \
    .withColumn('cloud7', rf_local_equal('qa', lit(6904))) \
    .withColumn('cloud8', rf_local_equal('qa', lit(6908))) 

df_mask_inv = mask_part \
    .withColumn('mask', rf_local_add('cloud1', 'cloud2')) \
    .withColumn('mask', rf_local_add('mask', 'cloud3')) \
    .withColumn('mask', rf_local_add('mask', 'cloud4')) \
    .withColumn('mask', rf_local_add('mask', 'cloud5')) \
    .withColumn('mask', rf_local_add('mask', 'cloud6')) \
    .withColumn('mask', rf_local_add('mask', 'cloud7')) \
    .withColumn('mask', rf_local_add('mask', 'cloud8')) \
    .drop('cloud1', 'cloud2', 'cloud3', 'cloud4', 'cloud5', 'cloud6', 'cloud7', 'cloud8', 'qa')
    
# at this point the mask contains 0 for good cells and 1 for defect, etc
# convert cell type and set value 1 to NoData
# also set the value of 100 to nodata in the target. #darkarts
mask_rf = df_mask_inv.withColumn('mask', rf_with_no_data(rf_convert_cell_type('mask', 'uint8'), 1.0)) \
                     .withColumn('target', rf_with_no_data('target', 100))

mask_rf.printSchema()

root
 |-- target: struct (nullable = true)
 |    |-- tile_context: struct (nullable = false)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- crs: struct (nullable = true)
 |    |-- crsProj4: string (nullable = false)
 |-- extent: struct (nullable = true)
 |    |-- xmin: double (nullable = false)
 |    |-- ymin: double (nullable = false)
 |    |-- xmax: double (nullable = false)
 |    |-- ymax: double (nullable = false)
 |-- coastal: tile (nullable = true)
 |-- blue: tile (nullable = true)
 |-- green: tile (nullable = true)
 |-- red: tile (nullable = true)
 |-- nir: tile (nullable = true)
 |-- swir1: tile (nullable = true)
 |-- swir2: ti

### Train/test split

In [19]:
rf = mask_rf.withColumn('train_set', when(rand(seed=1234) > 0.3, 1).otherwise(0))
train_df = rf.filter(rf.train_set == 1)
test_df = rf.filter(rf.train_set == 0)

### Create ML Pipeline

In [12]:
# exploded_df = train_df.select(rf_explode_tiles(array('coastal','blue','green','red','nir','swir1','swir2','ndvi','ndwi1','ndwi2','ndwi3','evi','savi','osavi','satvi','mean_swir','vli','dbsi','target', 'mask')))
# exploded_df.show()

In [20]:
from pyrasterframes import TileExploder
from pyrasterframes.rf_types import NoDataFilter

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

In [21]:
exploder = TileExploder()

noDataFilter = NoDataFilter() \
  .setInputCols(['target', 'mask'])

assembler = VectorAssembler() \
  .setInputCols(['coastal','blue','green','red','nir','swir1','swir2','ndvi','ndwi1','ndwi2','ndwi3','evi','savi','osavi','satvi','mean_swir','vli','dbsi']) \
  .setOutputCol("features")

classifier = DecisionTreeClassifier() \
  .setLabelCol('target') \
  .setMaxDepth(10) \
  .setFeaturesCol(assembler.getOutputCol())

pipeline = Pipeline() \
  .setStages([exploder, noDataFilter, assembler, classifier])

### Train the model

In [22]:
model = pipeline.fit(train_df)

In [None]:
#model.transform(train_df).show()

In [None]:
prediction_df = model.transform(test_df) \
                       .drop(assembler.getOutputCol()).cache()
prediction_df.printSchema()

eval = MulticlassClassificationEvaluator(
    predictionCol=classifier.getPredictionCol(),
    labelCol=classifier.getLabelCol(),
    metricName='fMeasureByThreshold'
)

f1score = eval.evaluate(prediction_df)
print("\nF1 Score:", f1score)

In [None]:
cnf_mtrx = prediction_df.groupBy(classifier.getPredictionCol()) \
    .pivot(classifier.getLabelCol()) \
    .count() \
    .sort(classifier.getPredictionCol())
cnf_mtrx