In [1]:
import os

In [2]:
if not os.path.exists("logs"):
    os.makedirs("logs")

In [3]:
import sys

from importlib import reload
import logging
reload(logging)

import time

logging.basicConfig(format='%(asctime)s %(message)s', 
                    filename=f'logs/{int(time.time())}.log',
                    level=logging.INFO)


import pandas as pd
import numpy as np

from glob import glob
from tqdm import tqdm_notebook as tqdm
# import tqdm

import matplotlib.pyplot as plt

import zipfile
import json

from io import StringIO, BytesIO

from minio import Minio
from minio.error import ResponseError

import boto3
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor


In [17]:
with open("config.json", "r") as f:
    config = json.load(f)

In [18]:
minioClient = Minio(config["minio_config"]["endpoint_url"].replace("http://","").rstrip("/"),
                    access_key=config["minio_config"]["aws_access_key_id"],
                    secret_key=config["minio_config"]["aws_secret_access_key"],
                    secure=False)

In [19]:
cols = ["Unnamed: 0", "Unnamed: 1", "speedLimit", 
"trainSpeed", 
"targetSpeed", 
"TransponderOK", 
"MSTEP_A_axle1RawSpeed", 
"MSTEP_A_axle2RawSpeed", 
"warningSpeed", 
"line",
"VehicleID"]

In [20]:
NROWS = None

In [21]:
def replace_element(l, old, new):
    return [new if x==old else x for x in l]

In [22]:
def get_diagnostics_df_from_bytes(bytes_data, cols=None, nrows=None):
    """Returns dataframe object from a bytes object
    representing a csv file extracted from a zip file."""
    
    # string representation of the data
    s=str(bytes_data,'utf-8')
    
    # reading the file content to get the header as a string,
    # to check if fields defined in features_dtypes_dict exists in header
    header_proxy = s[:10000].split(",")
    
    cols_corrected = cols.copy()
    # rename axle variables if needed
    if "MSTEP_A_axle1RawSpeed" not in header_proxy:
        cols_corrected = replace_element(cols_corrected, "MSTEP_A_axle1RawSpeed", "axle1RawSpeed")
    if "MSTEP_A_axle2RawSpeed" not in header_proxy:
        cols_corrected = replace_element(cols_corrected, "MSTEP_A_axle2RawSpeed", "axle2RawSpeed")

    data = StringIO(s)
    df = pd.read_csv(data, 
                     nrows=nrows, 
                     usecols=cols_corrected)
    
    # rename unnamed variables
    df = df.rename({"Unnamed: 0":"TimeStamp", 
                    "Unnamed: 1":"Record Number"}, 
                   axis=1)

    # rename axle variables if needed
    if "MSTEP_A_axle1RawSpeed" in df.columns:
        df = df.rename({"MSTEP_A_axle1RawSpeed":"axle1RawSpeed"}, axis=1)
    if "MSTEP_A_axle2RawSpeed" in df.columns:
        df = df.rename({"MSTEP_A_axle2RawSpeed":"axle2RawSpeed"}, axis=1)
    
    # Replacing * by previous numerical value, since it means "no change in value"
    df.replace("*", np.nan, inplace=True)
    df.fillna(method="ffill", inplace=True)
    
    return df

In [23]:
def write_df_to_disk(df, outname):
    logging.info(f"writing dataframe to {outname} on disk")
    if os.path.dirname(outname) != "":
        os.makedirs(os.path.dirname(outname), exist_ok=True)
    df.to_csv(outname, index=False)

In [24]:
def write_df_to_minio(df, outname, bucket_name, minioclient):
    logging.info(f"writing dataframe to {outname} in minio bucket {bucket_name}")
    csv_bytes = df.to_csv().encode('utf-8')
    csv_buffer = BytesIO(csv_bytes)

    if not minioclient.bucket_exists(bucket_name):
        minioclient.make_bucket(bucket_name)
    minioclient.put_object(bucket_name, 
                           outname,  
                           data=csv_buffer,           
                           length=len(csv_bytes), 
                           content_type='application/csv')

In [25]:
def log_processed_files(text, logfile="processedfiles.csv"):
    if os.path.exists(logfile):
        is_processed = False
        with open(logfile, "r") as file:
            for line in file:
                if text in line:
                    is_processed = True
                    break
                    
        if not is_processed: # adding file if it is not processed
            with open(logfile, "a") as file:
                file.write(f"{text}\n") # append missing data
    
    else: # creating file if it doesn't exist
        with open(logfile, "w") as file:
            file.write(f"{text}\n") # append missing data

In [26]:
def file_is_processed(text, logfile="processedfiles.csv"):
    if not os.path.exists(logfile):
        return False

    filelist = pd.read_csv(logfile, header=None).values.flatten()
    if text in filelist:
        return True
    else:
        return False

In [27]:
INPUT_BUCKET_NAME = "guillaume"

In [28]:
client = boto3.client("s3", 
                      endpoint_url=config["minio_config"]["endpoint_url"],
                      aws_access_key_id=config["minio_config"]["aws_access_key_id"],
                      aws_secret_access_key=config["minio_config"]["aws_secret_access_key"],
                      region_name=config["minio_config"]["region_name"])

paginator = client.get_paginator("list_objects_v2")

files = list()
for page in paginator.paginate(Bucket=INPUT_BUCKET_NAME):
    if "Contents" in page.keys():
        for obj in page["Contents"]:
            if "/Output Zip Files/" in obj["Key"] and obj["Key"].endswith(".zip"):
                files.append(obj["Key"])
    

In [29]:
def process_file(f):
        
#     time.sleep(np.random.randint(20,50)/1000)
#     pass
    if file_is_processed(f):
        logging.info(f"{f} is already processed.")
        return "processed"

#     minioClient = Minio(config["minio_config"]["endpoint_url"].replace("http://","").rstrip("/"),
#                     access_key=config["minio_config"]["aws_access_key_id"],
#                     secret_key=config["minio_config"]["aws_secret_access_key"],
#                     secure=False)
    
    data = minioClient.get_object(INPUT_BUCKET_NAME, f)
    
    try:
        zf = zipfile.ZipFile(BytesIO(data.read()))
    except zipfile.BadZipFile as bzf:
        logging.info(f"{f} is a bad zip file.")
        log_processed_files(f)
        return "bad zip file"

    archived_files = zf.namelist()
    diagnostic_files = [n for n in archived_files if "Diagno" in n and n.endswith("csv")]
        
    dfs = []
    for filename in diagnostic_files:
        try:
            data = zf.read(filename)
        except KeyError:
            logging.info(f"Did not find {filename} in zip file {f}.")
        else:
            df = get_diagnostics_df_from_bytes(bytes_data=data, 
                                               cols=cols,
                                               nrows=NROWS)
            if df is None:
                logging.info(f"{f} is skipped.")
                return "problem in diagnostic file"
            else:
                dfs.append(df)

    
    df_filt = pd.concat(dfs)
    file_info = df_filt.set_index(pd.to_datetime(df_filt["TimeStamp"])).sort_index().iloc[0]
    vid = file_info["VehicleID"]
    ymd = file_info.name.strftime("%Y-%m-%d")
    output_csv_name = f"filtered/{vid}_{ymd}.csv"
#     write_df_to_disk(df_filt, output_csv_name)
    write_df_to_minio(df_filt, output_csv_name, "odometryclassification", minioClient)
    log_processed_files(f)
    return "success"

In [31]:
for f in tqdm(files, total=len(files)):
    process_file(f)   

HBox(children=(IntProgress(value=0, max=834), HTML(value='')))

KeyboardInterrupt: 