# General Outline

## Read an MTL

## Create a Blob

## Format the blob as a dataset "doc"

## Reformat for Elastic

## Send to elastic as part of a datalake or cube record

# Code for all of this exists - although not always easy to find

---
---
---

# def s3_object_to_raw(bucket, object_prefix):

In [1]:
import logging
import boto3

def s3_object_to_raw(bucket, object_prefix):
    logging.info("getting raw %s %s", bucket, object_prefix)
    
    s3 = boto3.resource("s3")
    bucket_name = bucket
    path = object_prefix
    logging.info("Processing %s", path)
    key = path
    obj = s3.Object(bucket_name, key).get(ResponseCacheControl='no-cache')
    raw = obj['Body'].read().decode('utf8')
    print(path)
    return raw

    
    

In [2]:
bucket_name = 'deafrica-usgs-c2-data'
path='usgs_ls8c_level2_2/157/070/2015/11/08/usgs_ls8c_level2_2-0-20190911_157070_2015-11-08.odc-metadata.yaml'

my_raw = s3_object_to_raw(bucket_name, path)

my_raw

usgs_ls8c_level2_2/157/070/2015/11/08/usgs_ls8c_level2_2-0-20190911_157070_2015-11-08.odc-metadata.yaml


"---\n# Dataset\n$schema: https://schemas.opendatacube.org/dataset\nid: 3d5d8b9c-c284-56b5-9b5f-a7567962d392\n\nlabel: usgs_ls8c_level2_2-0-20190911_157070_2015-11-08\nproduct:\n  name: usgs_ls8c_level2_2\n  href: https://collections.dea.ga.gov.au/product/usgs_ls8c_level2_2\n\ncrs: epsg:32639\ngeometry:\n  type: Polygon\n  coordinates: [[[241785.0, -1481685.0], [241785.0, -1715415.0], [482415.0, -1715415.0],\n      [482415.0, -1481685.0], [241785.0, -1481685.0]]]\ngrids:\n  default:\n    shape: [7791, 8021]\n    transform: [30.0, 0.0, 241785.0, 0.0, -30.0, -1481685.0, 0.0, 0.0, 1.0]\n\nproperties:\n  datetime: 2015-11-08 06:38:57.875828Z\n  eo:cloud_cover: 68.37\n  eo:gsd: 30.0  # Ground sample distance (m)\n  eo:instrument: OLI_TIRS\n  eo:platform: landsat-8\n  eo:sun_azimuth: 98.06121879\n  eo:sun_elevation: 64.46336127\n  landsat:collection_number: 2\n  landsat:geometric_rmse_model_x: 4.688\n  landsat:geometric_rmse_model_y: 6.009\n  landsat:geometric_rmse_verify: 7.144\n  landsat:g

In [3]:
import re

MTL_PAIRS_RE = re.compile(r'(\w+)\s=\s(.*)')

def _parse_value(s):
    s = s.strip('"')
    for parser in [int, float]:
        try:
            return parser(s)
        except ValueError:
            pass
    return s


def _parse_group(lines):
    tree = {}
    for line in lines:
        match = MTL_PAIRS_RE.findall(line)
        if match:
            key, value = match[0]
            if key == 'GROUP':
                tree[value] = _parse_group(lines)
            elif key == 'END_GROUP':
                break
            else:
                tree[key] = _parse_value(value)
    return tree

In [4]:
bucket_name = 'deafrica-usgs-c2-data'
path='usgs_ls8c_level2_2/157/070/2015/11/08/LC08_L2SP_157070_20151108_20190911_02_T1_MTL.txt'

my_raw = s3_object_to_raw(bucket_name, path)

raw_MTL = _parse_group(iter(my_raw.split("\n")))['LANDSAT_METADATA_FILE']

raw_MTL

usgs_ls8c_level2_2/157/070/2015/11/08/LC08_L2SP_157070_20151108_20190911_02_T1_MTL.txt


{'PRODUCT_CONTENTS': {'ORIGIN': 'Image courtesy of the U.S. Geological Survey',
  'DIGITAL_OBJECT_IDENTIFIER': 'https://doi.org/10.5066/F78S4MZJ',
  'LANDSAT_PRODUCT_ID': 'LC08_L2SP_157070_20151108_20190911_02_T1',
  'PROCESSING_LEVEL': 'L2SP',
  'COLLECTION_NUMBER': 2,
  'COLLECTION_CATEGORY': 'T1',
  'OUTPUT_FORMAT': 'GEOTIFF',
  'FILE_NAME_BAND_1': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B1.TIF',
  'FILE_NAME_BAND_2': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B2.TIF',
  'FILE_NAME_BAND_3': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B3.TIF',
  'FILE_NAME_BAND_4': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B4.TIF',
  'FILE_NAME_BAND_5': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B5.TIF',
  'FILE_NAME_BAND_6': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B6.TIF',
  'FILE_NAME_BAND_7': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B7.TIF',
  'FILE_NAME_BAND_ST_B10': 'LC08_L2SP_157070_20151108_20190911_02_T1_ST_B10.TIF',
  'FILE_NAME_THERMAL_RADIANCE': 'LC08_L2SP_157070_2015

# Future 
# def satellite_ref(sat):

In [5]:
bands_ls8_espa = [('1', 'sr_band1'),
             ('2', 'blue'),
             ('3', 'green'),
             ('4', 'red'),
             ('5', 'nir'),
             ('6', 'swir1'),
             ('7', 'swir2'),
             ]

band_file_map = {
                  'sr_band1' : 'sr_band1',
                  'blue' : 'sr_band2',
                  'green' : 'sr_band3',
                  'red' : 'sr_band4',
                  'nir' : 'sr_band5',
                  'swir1' : 'sr_band6',
                  'swir2' : 'sr_band7',
                  'therm' : 'bt_band10',
                  'pixel_qa' : 'pixel_qa',
                }

def satellite_ref(sat):
    """
    load the band_names for referencing LANDSAT8  USARD
    """
    if sat == 'LANDSAT_8':
        sat_img = bands_ls8_espa
        prod_type = 'espa'
    else:
        raise ValueError('Satellite data Not Supported')
    return sat_img, prod_type

def get_band_file_map(item):
    return band_file_map[item]


In [6]:
from osgeo import osr


"""

MTL metadata parsing

This module contains the MetaBlob class for holding a dictionary (dict) of satellite metadata
read from a .MTL file created by someone in espaLand

more to come ...

watch this space ...

"""

def get_geo_ref_points(info):
    return {
        'ul': {'x': info['CORNER_UL_PROJECTION_X_PRODUCT'], 'y': info['CORNER_UL_PROJECTION_Y_PRODUCT']},
        'ur': {'x': info['CORNER_UR_PROJECTION_X_PRODUCT'], 'y': info['CORNER_UR_PROJECTION_Y_PRODUCT']},
        'll': {'x': info['CORNER_LL_PROJECTION_X_PRODUCT'], 'y': info['CORNER_LL_PROJECTION_Y_PRODUCT']},
        'lr': {'x': info['CORNER_LR_PROJECTION_X_PRODUCT'], 'y': info['CORNER_LR_PROJECTION_Y_PRODUCT']},
    }


def get_coords(geo_ref_points, spatial_ref):
    t = osr.CoordinateTransformation(spatial_ref, spatial_ref.CloneGeogCS())

    def transform(p):
        lon, lat, z = t.TransformPoint(p['x'], p['y'])
        return {'lon': lon, 'lat': lat}

    return {key: transform(p) for key, p in geo_ref_points.items()}

# import re
import pprint
# from xml.etree import ElementTree

class MetaBlob:


    def __init__(self, directory, raw_MTL):

        self.directory = directory
        self.mtlstring = raw_MTL
        self.set_global_metadata()
        self.set_geography_coords()
        # self.set_projection_coords()
        self.set_band_file_names()


    def set_global_metadata(self):
        mtl_data = self.mtlstring
        
        # print(mtl_data['IMAGE_ATTRIBUTES']['SPACECRAFT_ID'])
        
        self.data_provider = 'UKNOWN'
        mtl_product_info = mtl_data['IMAGE_ATTRIBUTES']
        # mtl_metadata_info = mtl_data['METADATA_FILE_INFO']
        satellite = mtl_product_info['SPACECRAFT_ID']
        print(satellite)
        self.satellite = satellite
        self.instrument = mtl_product_info['SENSOR_ID']
        self.acquisition_date = mtl_product_info['DATE_ACQUIRED']
        self.scene_center_time = mtl_product_info['SCENE_CENTER_TIME']
        self.product_id = mtl_data['PRODUCT_CONTENTS']['LANDSAT_PRODUCT_ID']
        self.lpgs_metadata_file = "A.MTL"
        


    def get_global_metadata(self):
        print (self.data_provider)
        print (self.satellite)
        print (self.instrument)
        print (self.acquisition_date)
        print (self.scene_center_time)
        print (self.product_id)
        print (self.lpgs_metadata_file)



    def set_geography_coords(self):
        mtl_data = self.mtlstring
        cs_code = 32600 + mtl_data['PROJECTION_ATTRIBUTES']['UTM_ZONE']
        spatial_ref = osr.SpatialReference()
        spatial_ref.ImportFromEPSG(cs_code)
        self.spatial_ref = spatial_ref
        self.cs_code = cs_code
        mtl_product_info = mtl_data['PROJECTION_ATTRIBUTES']
        geo_ref_points = get_geo_ref_points(mtl_product_info)
        self.geo_ref_points = geo_ref_points
        coordinates = get_coords(geo_ref_points, spatial_ref)
        self.coord = coordinates


    def get_geography_coords(self):
        print(self.west)
        print(self.east)
        print(self.north)
        print(self.south)
        print(self.coord)

    def set_projection_coords(self):
        xmlstring = self.xmlstring

        xmlstring = re.sub(r'\sxmlns="[^"]+"', '', xmlstring, count=1)
        doc = ElementTree.fromstring(xmlstring)
        projection_parameters = doc.find('.//projection_information')
        for corner_point in projection_parameters.findall('corner_point'):
            if corner_point.attrib['location'] in 'UL':
                self.westx = corner_point.attrib['x']
                self.northy = corner_point.attrib['y']
            if corner_point.attrib['location'] in 'LR':
                self.eastx = corner_point.attrib['x']
                self.southy = corner_point.attrib['y']


    def get_projection_coords(self):
        #print(self.projection_information)
        print(self.westx)
        print(self.eastx)
        print(self.northy)
        print(self.southy)


    def set_band_file_names(self):
        self.file_dict = {}
        mtl_data = self.mtlstring
        mtl_product_info = mtl_data['PRODUCT_CONTENTS']
        satellite = mtl_data['IMAGE_ATTRIBUTES']['SPACECRAFT_ID']
        bands, prod_type  = satellite_ref(satellite)

        for band in bands:
            print(band)
            print(band[0][0])
            #print(band[0][1])
            band_num, band_name = band
            # band_file_name = mtl_product_info['FILE_NAME_BAND_' + band[0][0]]
            band_file_name = mtl_product_info['FILE_NAME_BAND_' + band_num]
            print(band_name)
            self.file_dict[band_name] = band_file_name
        pp = pprint.PrettyPrinter(depth=6)
        pp.pprint (self.file_dict)



In [7]:
raw_MTL['IMAGE_ATTRIBUTES']['SPACECRAFT_ID']

'LANDSAT_8'

In [8]:
bucket_name = 'deafrica-usgs-c2-data'
path='usgs_ls8c_level2_2/157/070/2015/11/08/LC08_L2SP_157070_20151108_20190911_02_T1_MTL.txt'
dir = 'usgs_ls8c_level2_2/157/070/2015/11/08/'

meta_blob = MetaBlob(dir, raw_MTL)

LANDSAT_8
('1', 'sr_band1')
1
sr_band1
('2', 'blue')
2
blue
('3', 'green')
3
green
('4', 'red')
4
red
('5', 'nir')
5
nir
('6', 'swir1')
6
swir1
('7', 'swir2')
7
swir2
{'blue': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B2.TIF',
 'green': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B3.TIF',
 'nir': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B5.TIF',
 'red': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B4.TIF',
 'sr_band1': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B1.TIF',
 'swir1': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B6.TIF',
 'swir2': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B7.TIF'}


In [9]:
meta_blob.satellite

'LANDSAT_8'

In [10]:
type(meta_blob)

__main__.MetaBlob

In [11]:
meta_blob.get_global_metadata()

UKNOWN
LANDSAT_8
OLI_TIRS
2015-11-08
06:38:57.8758280Z
LC08_L2SP_157070_20151108_20190911_02_T1
A.MTL


In [12]:
import uuid
def make_doc_from_meta_blob(meta_blob):
    

    ####
    level = meta_blob.product_id.split('_')[1]
    images, product_type = satellite_ref(meta_blob.satellite)
    # print("IMAGES",images)
    center_dt = meta_blob.acquisition_date + " " + meta_blob.scene_center_time
    start_time = center_dt
    end_time = center_dt

 
    docdict = {
        'id': str(uuid.uuid4()),
        'processing_level': str(level),
        'product_type': product_type,
        'creation_dt': meta_blob.acquisition_date,
        'platform': {'code': meta_blob.satellite},
        'instrument': {'name': meta_blob.instrument},
        'extent': {
            'from_dt': str(start_time),
            'to_dt': str(end_time),
            'center_dt': str(center_dt),
            'coord': meta_blob.coord,
        },
        'format': {'name': 'GeoTiff'},

        'grid_spatial': {
            'projection': {
                'geo_ref_points': meta_blob.coord,
                'spatial_reference': meta_blob.spatial_ref,
            }
        },
        'image': {
            'bands': {
                image[1]: {
                    'path': meta_blob.file_dict[image[1]],
                    'layer': 1,
                } for image in images
            }
        },

        'lineage': {'source_datasets': {}}
    }
    # print (docdict)
    return docdict

In [13]:
metadata_doc = make_doc_from_meta_blob(meta_blob)

In [14]:
metadata_doc

{'id': '905c8be0-b2f9-4e6f-adfc-6d4aeae1008a',
 'processing_level': 'L2SP',
 'product_type': 'espa',
 'creation_dt': '2015-11-08',
 'platform': {'code': 'LANDSAT_8'},
 'instrument': {'name': 'OLI_TIRS'},
 'extent': {'from_dt': '2015-11-08 06:38:57.8758280Z',
  'to_dt': '2015-11-08 06:38:57.8758280Z',
  'center_dt': '2015-11-08 06:38:57.8758280Z',
  'coord': {'ul': {'lon': 48.61581264855099, 'lat': -13.391720320317072},
   'ur': {'lon': 50.83743424966636, 'lat': -13.402923511569023},
   'll': {'lon': 48.5931986965967, 'lat': -15.502896984863423},
   'lr': {'lon': 50.835890519587856, 'lat': -15.515946389575051}}},
 'format': {'name': 'GeoTiff'},
 'grid_spatial': {'projection': {'geo_ref_points': {'ul': {'lon': 48.61581264855099,
     'lat': -13.391720320317072},
    'ur': {'lon': 50.83743424966636, 'lat': -13.402923511569023},
    'll': {'lon': 48.5931986965967, 'lat': -15.502896984863423},
    'lr': {'lon': 50.835890519587856, 'lat': -15.515946389575051}},
   'spatial_reference': <osgeo

In [15]:
def create_footprint(coord):

    # still need to figure out if its lon,lat or lat,lon
    # in most cases the routines are now longitude the latitude or (x then y)
    # print("TONY foot coord:", coord)

    foot = {
                "type": "Polygon", 
                "coordinates": [
                    [
                        [
                            float(coord['ul']['lon']),
                            float(coord['ul']['lat'])
                        ], 
                        [
                            float(coord['ur']['lon']),
                            float(coord['ur']['lat'])
                        ], 
                        [
                            float(coord['lr']['lon']),
                            float(coord['lr']['lat'])
                        ], 
                        [
                            float(coord['ll']['lon']),
                            float(coord['ll']['lat'])
                        ],
                        [
                            float(coord['ul']['lon']),
                            float(coord['ul']['lat'])
                        ] 
                    ]
                ]

    } 

    # print (foot)

    return foot

def elastic_flatten_doc(mdoc):
    foot = create_footprint(mdoc['extent']['coord'])
    elastic_doc = {
                    'creation_dt': mdoc['creation_dt'],
                    'processing_level': mdoc['processing_level'],
                    'red': mdoc['image']['bands']['red']['path'],
                    'green': mdoc['image']['bands']['green']['path'],
                    'blue': mdoc['image']['bands']['blue']['path'],
                    'nir': mdoc['image']['bands']['nir']['path'],
                    #'pixel_qa': mdoc['image']['bands']['quality_l2_aerosol']['path'],
                    'ul': {
                            'lat': mdoc['extent']['coord']['ul']['lat'],
                            'lon': mdoc['extent']['coord']['ul']['lon'],
                        },
                    'lr': {
                            'lat': mdoc['extent']['coord']['lr']['lat'],
                            'lon': mdoc['extent']['coord']['lr']['lon'],
                        },
                    'footprint': foot
                }
    return elastic_doc

In [16]:
elastic_flatten_doc
elastic_ready_doc = elastic_flatten_doc(metadata_doc)

In [17]:
elastic_ready_doc

{'creation_dt': '2015-11-08',
 'processing_level': 'L2SP',
 'red': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B4.TIF',
 'green': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B3.TIF',
 'blue': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B2.TIF',
 'nir': 'LC08_L2SP_157070_20151108_20190911_02_T1_SR_B5.TIF',
 'ul': {'lat': -13.391720320317072, 'lon': 48.61581264855099},
 'lr': {'lat': -15.515946389575051, 'lon': 50.835890519587856},
 'footprint': {'type': 'Polygon',
  'coordinates': [[[48.61581264855099, -13.391720320317072],
    [50.83743424966636, -13.402923511569023],
    [50.835890519587856, -15.515946389575051],
    [48.5931986965967, -15.502896984863423],
    [48.61581264855099, -13.391720320317072]]]}}

In [19]:
import json
elastic_json_record = json.dumps(elastic_ready_doc)

In [20]:
elastic_json_record

'{"creation_dt": "2015-11-08", "processing_level": "L2SP", "red": "LC08_L2SP_157070_20151108_20190911_02_T1_SR_B4.TIF", "green": "LC08_L2SP_157070_20151108_20190911_02_T1_SR_B3.TIF", "blue": "LC08_L2SP_157070_20151108_20190911_02_T1_SR_B2.TIF", "nir": "LC08_L2SP_157070_20151108_20190911_02_T1_SR_B5.TIF", "ul": {"lat": -13.391720320317072, "lon": 48.61581264855099}, "lr": {"lat": -15.515946389575051, "lon": 50.835890519587856}, "footprint": {"type": "Polygon", "coordinates": [[[48.61581264855099, -13.391720320317072], [50.83743424966636, -13.402923511569023], [50.835890519587856, -15.515946389575051], [48.5931986965967, -15.502896984863423], [48.61581264855099, -13.391720320317072]]]}}'

In [23]:
from c2indexLib.elastic_index import store_record
from c2indexLib.elastic_meta import elastic_flatten_doc
from c2indexLib.elastic_index import connect_elasticsearch
from c2indexLib.elastic_index import l_create_index
def create_elastic_connection_and_index(index_name, record_type):
    es_conn = connect_elasticsearch()
    # delete any old indexes - similar to clearing the postgres db
    es_conn.indices.delete(index='datalake', ignore=[400, 404])

    # create new elastic search index
    
    l_create_index(es_conn, index_name, record_type)

    return es_conn


In [24]:
index_name='datalake'
record_type = 'odclite'


es_conn = create_elastic_connection_and_index(index_name, record_type)

Yay Connect
Created Index


In [25]:
store_record(es_conn, index_name, record_type, elastic_json_record)