In [1]:
#TODO
# Integrate Leaflet in area selection
# Verify if all tiles are fetched
# Export in OruxMap format (!!!)
#
# long = x, lat = y
# Map data geoportail (see for 4umaps.com or OSM)

In [2]:
import io
import numpy
from PIL import Image
import aiohttp
import asyncio
import pyproj
from lxml import etree

In [3]:
#XML DATA
_FILE_PATH = 'wmts.xml'
_DEF_NS = {"foo": "http://www.opengis.net/wmts/1.0","ows":"http://www.opengis.net/ows/1.1"}

In [4]:
base_url = "https://wxs.ign.fr/an7nvfzojv5wa96dsga5nk8w/geoportail/wmts"
_headers = {
        'Accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Encoding': 'gzip, deflate',
        'Accept-Language': 'en-US,en;q=0.5',
        'Connection': 'keep-alive',
        'Host': 'wxs.ign.fr',
        'Referer': 'http://www.geoportail.gouv.fr/swf/geoportal-visu-1.3.2.swf',
        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:22.0) Gecko/20100101 Firefox/22.0',
}
#to get more info : https://wxs.ign.fr/an7nvfzojv5wa96dsga5nk8w/geoportail/wmts?Service=WMTS&Request=GetCapabilities&Version=1.0.0&Layer=GEOGRAPHICALGRIDSYSTEMS.MAPS.SCAN25TOUR.CV

In [5]:
def get_scale_denom_info(file):
    """Return dict with tile matrix set identifier, zoom(scale), scale denom(meters/pixel)"""
    data = {}
    tree = etree.parse(_FILE_PATH)
    
    TileMatrixSets = tree.xpath('/foo:Capabilities/foo:Contents/foo:TileMatrixSet', namespaces = _DEF_NS)
    for tilematrixset in TileMatrixSets:
        identifier = tilematrixset.find('ows:Identifier', namespaces = _DEF_NS).text
        supportedcrs = tilematrixset.find('ows:SupportedCRS', namespaces = _DEF_NS).text
        data[identifier] = {}
        data[identifier]['supportedcrs'] = supportedcrs
        TileMatrixes = tilematrixset.findall('foo:TileMatrix', namespaces = _DEF_NS)
        for tilematrix in TileMatrixes:
            scale = tilematrix.find('ows:Identifier', namespaces = _DEF_NS).text
            #Dénominateur d'échelle = résolution / taille pixel ; taille de pixel arbitraire = 0.00028 m
            scale_denom = float(tilematrix.find('foo:ScaleDenominator', namespaces = _DEF_NS).text) * 0.00028 
            data[identifier][scale]=scale_denom
            str_ref = tilematrix.find('foo:TopLeftCorner', namespaces = _DEF_NS).text
            (data[identifier]['X_ref'],data[identifier]['Y_ref']) =  map(float, str_ref.split(" "))
    return data

In [6]:
#Get all information about tile sets
MATRIX_SET_DATA = get_scale_denom_info(_FILE_PATH)

In [7]:
def convert_coord(long_lat, matrix_data, zoom, projection ):
    """convert decimals coords into Web Mercator coords into tile coords"""
    #See https://geoservices.ign.fr/documentation/geoservices/wmts.html 
    
    long_lat_ref = pyproj.Proj(init='epsg:4326')
    
    final_proj = pyproj.Proj(init=matrix_data[projection]['supportedcrs'])
    tile_res = matrix_data[projection][str(zoom)]*256 #256 pixel per tile
    
    #Web Mercator
    (x, y) = pyproj.transform(long_lat_ref, final_proj, long_lat[0], long_lat[1])

    # Get the top left corner
    (x0, y0) = (matrix_data[projection]['X_ref'], matrix_data[projection]['Y_ref'])
    
    #Tile coords depending on matrix_data (see get_scale_denom_info function)
    (xf, yf) = (int((x - x0) / tile_res), int((y0 - y) / tile_res))
    print("Zoom: "+ str(zoom) +" Tile Rez: " + str(tile_res)+ " X,Y: "+str(xf)+","+str(yf))
        
    return(xf, yf)

In [8]:
def get_map(long_lat_1, long_lat_2, zoom = 15, projection = "PM"):
    
    #Map limits
    top_left_corner = convert_coord((min(long_lat_1[0],long_lat_2[0]),max(long_lat_1[1],long_lat_2[1])),
                                    MATRIX_SET_DATA, zoom, projection)
        
    bot_right_corner = convert_coord((max(long_lat_1[0],long_lat_2[0]),min(long_lat_1[1],long_lat_2[1])),
                                     MATRIX_SET_DATA, zoom, projection)
    
    #Initialization of data contenant
    contenant = numpy.empty([top_left_corner[1]-bot_right_corner[1]+1,bot_right_corner[0]-top_left_corner[0]+1,256,256,3], dtype = 'uint8')
        
    
    async def put_tile(session, i, j, zoom):
        #data = await get_tile(session, i, j, zoom)
        contenant[Row][Col] = Image.open(io.BytesIO(data))
        
    async def get_tile(session, i, j, zoom):
        _values = default_values
        _values['TileCol'] = i
        _values['TileRow'] = j
        _values['TileMatrix'] = zoom

        async with session.request('GET', base_url, params = _values) as resp:
            print(resp.status)
            assert resp.status == 200
            return await  resp.read()
        
    async def main(loop):
        tasks = []
        async with aiohttp.ClientSession(loop=loop, headers = _headers ) as session:
            for j in range(bot_right_corner[1],top_left_corner[1]):
                for i in range(top_left_corner[0],bot_right_corner[0]):
                    task = asyncio.create_task(put_tile(session, i, j, zoom))
                    tasks.append(task)
            await asyncio.gather(*tasks)
            

    loop = asyncio.get_event_loop()
    loop.create_task(main(loop))
    
    return contenant

In [9]:
test = get_map((1.4566024,43.611715),(2.4566024,43.611715), zoom = 12)

Zoom: 12 Tile Rez: 9783.93962050256 X,Y: 2064,1495
Zoom: 12 Tile Rez: 9783.93962050256 X,Y: 2075,1495


In [10]:
Image.fromarray(numpy.concatenate(numpy.concatenate(test, axis= 1 ), axis=1)).save("output.jpg")