In [1]:
import os
import io
import gzip
import uuid
import datetime
import zipfile as zp

import pandas as pd
import requests
from tqdm import tqdm
from datetime import datetime
from elasticsearch import Elasticsearch
from dateutil.parser import parse

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

In [2]:
r = requests.get("https://www.labri.fr/perso/rgiot/cours/anavis/brut.zip")
zf = zp.ZipFile(io.BytesIO(r.content), mode="a")

In [3]:
summary = pd.read_csv(zf.open("brut/bicincitta_parma_summary.csv"), sep=";")
summary.head(30)

Unnamed: 0,system,station,latitude,longitude,elevation
0,bicincitta_parma,01. Duc,44.807118,10.332934,51.076065
1,bicincitta_parma,02. Ospedale Maggiore,44.802263,10.306275,56.344078
2,bicincitta_parma,03. Traversetolo,44.781595,10.344492,58.324486
3,bicincitta_parma,04. Campus Chimica,44.766433,10.314547,76.587212
4,bicincitta_parma,05. Stazione FF.SS.,44.809888,10.327693,57.179089
5,bicincitta_parma,06. Ponte di Mezzo,44.802243,10.32427,57.499104
6,bicincitta_parma,07. Santa Croce,44.804064,10.314443,54.25885
7,bicincitta_parma,08. Bixio,44.79635,10.317877,59.671028
8,bicincitta_parma,09. Farini,44.795387,10.327644,61.132015
9,bicincitta_parma,10. Barilla Center,44.798416,10.340688,53.977478


# Importation des données vers Kibana via Elasticsearch

## Nettoyage des indexs

In [14]:
try:
    es.indices.delete(index="velos")
except:
    pass

In [15]:
mapping = {
        "mappings": {
                "properties": {
                    "timestamp": {
                        "type": "date"
                    },
                    "location": {
                        "type": "geo_point"
                    },
                }
            }
        }

es.indices.create(index="velos", body=mapping)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'velos'}

## Importation des données

In [6]:
def load_directory(d_name, data_dir):
    """
    Génère un DataFrame pour chaque donnée stockée dans le sous répertoire spécifié.
    
    Args:
        d_name: nom du sous répertoire
        data_dir: répertoire racine
        
    Yield:
        DataFrame pour chaque fichier dans d_name
    """
    for f_name in os.listdir(os.path.join(data_dir, d_name)):
        file = gzip.open(os.path.join(data_dir, d_name, f_name), "r")
        df = pd.read_csv(file, sep=";")
        df["Timestamp"] = df["Timestamp"].apply(lambda x : parse(x).isoformat())
        df["Station"] = df["Station"].apply(lambda station : station[4:])
        df.columns = [c.lower() for c in df.columns]
        yield df

In [7]:
def check_interval(row, date1, date2):
    """
    Vérifie qu'une donnée appartient à l'intervalle de date choisi.
    
    Args:
        row: donnée à vérifier
        date1: début de l'intervalle
        date2: fin de l'intervalle
        
    Return:
        
    """
    return (row["timestamp"] > parse(date1).isoformat() and row["timestamp"] < parse(date2).isoformat())

In [8]:
def fix_data(row):
    """
    Résout le problème du "None" WindDeg
    
    Args:
        row: donnée à réparer
    Return:
        row
    """
    for key, item in row.to_dict().items():
        if item == "None":
            row[key] = 0.0 
    return row

In [16]:
def load_2016_data():
    """
    Charge les données de 2016.
    """
    datas = []
    data_index= "velos"
    data_dir = "clean/"
    progress = 0
    
    for d_name in os.listdir("./clean/"):
        latitude = float(summary[(summary["station"] == d_name) | (summary["station"] == d_name+".")].latitude)
        longitude = float(summary[(summary["station"] == d_name) | (summary["station"] == d_name+".")].longitude)
        for df in load_directory(d_name, data_dir):      
            for _, row in df.iterrows():
                try:
                    if check_interval(row, "2016-01-01 00:00:00", "2016-12-31 23:59:59"):
                        row = fix_data(row)
                        row["location"] = [longitude, latitude]
                        if row['total'] > 0:
                            row['occupation'] = row['bikes']/row['total'] * 100
                        else:
                            row['occupation'] = 0
                        if type(row["timestamp"]) is str:
                            row["day"] = parse(row["timestamp"]).strftime("%A")
                        else:
                            row["day"] = row["timestamp"].stftime("%A")
                        datas.append('{ "index" : { "_index" : "' + data_index +'" } }')
                        datas.append(row.to_json())
                        if(len(datas) == 1000):
                            es.bulk(datas, request_timeout=100)
                            datas = []
                        progress += 1
                        print(f"Processing doc {progress}.", end="\r")
                except e:
                    print(e)
                    pass
    # Envoi des données restantes
    if len(datas) > 0:
        es.bulk(datas, request_timeout=100)
        datas = []

load_2016_data()

Processing doc 987419.