# Carburants data extraction for personal poc

The aim is to automate data extraction, formatting and indexation in Elasticsearch.

Data on gaz stations':
* id
* localization (longitude, latitude, city, adress, zip code)
* gaz prices
* is it open or closed?
* opening hours

This was used for test and exploration, I wrote a cli `carburants.py` to do all this in one line.

In [27]:
import json
import requests
import xmltodict

from collections import defaultdict
from copy import deepcopy
from io import BytesIO
from os import listdir, mkdir
from os.path import exists
from zipfile import ZipFile

URL_CARBURANTS_WEEK = 'http://donnees.roulez-eco.fr/opendata/jour'
URL_CARBURANTS_YEAR = 'http://donnees.roulez-eco.fr/opendata/annee'

In [4]:
def get_latest_carburant_values(data_type='week', destination_dir='carburants'):
    if data_type.lower() == 'week':
        url = URL_CARBURANTS_WEEK
    else:
        url = URL_CARBURANTS_YEAR
    request = requests.get(url)
    zf = ZipFile(BytesIO(request.content))
    zf.extractall(destination_dir)
    return ['{}/{}'.format(destination_dir, file) for file in listdir(destination_dir)]

def read_carburant_values(xml_file):
    with open(xml_file, encoding="ISO-8859-1") as fd:
        values = xmltodict.parse(fd.read())
    return values


In [5]:
carburant_xml = get_latest_carburant_values()
carburant_pdvs = read_carburant_values(carburant_xml.pop())

In [6]:
def extract_pdv_data(pdvs):
    data_pdvs = defaultdict(dict)
    for index, pdv in enumerate(carburant_pdvs['pdv_liste']['pdv']):
        pdv_id = pdv['@id']
        if 'fermeture' not in pdv.keys():
            data_pdvs[pdv_id]['cp'] = pdv['@cp']
            data_pdvs[pdv_id]['localisation'] = {}
            try:
                data_pdvs[pdv_id]['localisation']['lat'] =  float(pdv['@latitude']) / 100000
                data_pdvs[pdv_id]['localisation']['lon'] = float(pdv['@longitude']) / 100000
            except ValueError:
                pass
            data_pdvs[pdv_id]['adresse'] =  pdv['adresse'].lower()
            data_pdvs[pdv_id]['saufjour'] =  pdv['ouverture']['@saufjour'].lower()
            try:
                data_pdvs[pdv_id]['prix'] = get_prices(pdv['prix'])
            except KeyError:
                pass
            data_pdvs[pdv_id]['ville'] = pdv['ville']
    return data_pdvs
    
def get_prices(prices):
    clean_prices = {}
    if isinstance(prices, dict):
        prices['@nom'] = float(prices['@valeur']) / 1000.0
    else:
        for price in prices:
            clean_prices[price['@nom']] = float(price['@valeur']) / 1000.0
    return clean_prices

In [7]:
data_pdvs = extract_pdv_data(carburant_pdvs['pdv_liste']['pdv'])

In [8]:
data_pdvs['90130001']

{'adresse': '2 rue de la libération',
 'cp': '90130',
 'localisation': {'lat': 47.60565, 'lon': 7.004},
 'prix': {'Gazole': 1.099, 'SP95': 1.329},
 'saufjour': '',
 'ville': 'Montreux-Château'}

In [9]:
carburant_json = 'carburants.json'
with open(carburant_json, 'w') as fw:
    json.dump(data_pdvs, fw)

## Putting data in elasticsearch 

In [53]:
def rewrite_for_bulk(carburant_json, carburant_bulk, index_name='carburants', doc_type='pdv'):
    with open(carburant_json, 'r') as fp:
        carburants_data = json.load(fp)
    with open(carburant_bulk, 'w') as fw:
        for pdv_id, pdv_infos in carburants_data.items():
            pdv_for_es = deepcopy(pdv_infos)
            pdv_for_es['id'] = pdv_id
            fw.write('{ "index" : { "_index" : "%s", "_type" : "%s", "_id" : "%s" } }\n' % (index_name, doc_type, pdv_id))
            fw.write(json.dumps(pdv_for_es) + '\n')
    return carburants_data

In [54]:
carburant_bulk = 'es_carburants.json'
data = rewrite_for_bulk(carburant_json, carburant_bulk)

### Put the mapping in the ES index

    curl -XPUT http://localhost:9200/carburants -d '
    {
        "mappings": {
            "pdv": {
                "properties": {
                    "id": {"type": "string"},
                    "ville": {"type": "string"},
                    "adresse": {"type": "string"},
                    "saufjour": {"type": "string"},
                    "cp": {"type": "string"},
                    "localisation": {"type": "geo_point"},
                    "prix": { 
                        "dynamic": "true",
                        "properties": {
                            "sp98": {"type": "string"},
                            "sp95": {"type": "string"},
                            "gazole": {"type": "string"},
                            "e10": {"type": "string"}
                        }
                    }
                }
            }
        }
    }
    '
    
    
### Bulk json data into the index 

    curl -s -XPOST localhost:9200/_bulk --data-binary "@es_carburants.json"; echo
    
It's there :) and we can query the closest gaz station thanks to ES's geo-point type! (Check the mapping).
    
![Sense screenshot of the carburants index](img/es-carburants.png)


### Example querying stations using geo distance query

    # LATITUDE LONGITUDE QUERY
    GET carburants/_search
    {
      "query": {
        "bool": {
          "must": {
            "match_all": {}
          },
          "filter": {
            "geo_distance": {
              "distance": "1km",
              "localisation": {
                "lat": 42.6409,
                "lon": 2.877
              }
            }
          }
        }
      }
    }
    
    # LATITUDE LONGITUDE QUERY WITH SPECIFIC GAZ TYPE
    GET carburants/_search
    {
      "query": {
        "bool": {
          "must": {
            "exists": {
              "field": "prix.SP95"
            }
          },
          "filter": {
            "geo_distance": {
              "distance": "1km",
              "localisation": {
                "lat": 42.6409,
                "lon": 2.877
              }
            }
          }
        }
      }
    }
  