## Import

In [1]:
import sys

sys.path.append(r'C:\Users\Marco\Documents\GitHub\GeoSpatial-analysis\facility-location-Bergen\src\facility_location_Bergen\custome_modules')

In [2]:
# Get the database using the method we defined in pymongo_test_insert file
import os
import pytz
import copy
import json
import geojson
import numpy as np
import pandas as pd
from dateutil import parser
from pymongo import database
from get_api_call_time import get_api_call_time
from geojson import GeometryCollection, LineString
from kedro.extras.datasets.json import JSONDataSet
from mongo_db import retrieve_database_and_collections, take_empty_collections

## Connect to MongoDB

### Retrieve the database and the collection

Defing the database and the collections names

In [3]:
days = "20_04_2023"
db_name = "facility_location_Bergen"

In [4]:
db, collections = retrieve_database_and_collections(db_name, days, ["raw", "processed"])
empty_collections = take_empty_collections(collections)

In [5]:
empty_collections

{}

### Insert documents in the collections

In [6]:
root_dir = r"C:\Users\Marco\Documents\GitHub\GeoSpatial-analysis\facility-location-Bergen\data\01_raw\Bergen"

In [14]:
def compose_url_to_raw_data(db_name: str, day: str, root_dir: str):
    # retrieve database and collections wrappers
    db, collections = retrieve_database_and_collections(db_name, day, ["raw", "processed"])
    empty_collections = take_empty_collections(collections)
    # check if the number of empty collections is not more than 2
    if len(empty_collections) > 2:
        raise ValueError("No more than one date can be processed at a time, please select only one new date.")
    # compose the urls to the raw data
    day_ = np.unique([key[-10:] for key in empty_collections.keys()])
    # check if the day_ is not empty
    if len(day_) == 0:
        return []
    dirs = [dir for dir in os.listdir(root_dir+f"\\{day_[0]}") if dir[:10] in day_]
    dirs_urls = [os.path.join(root_dir+f"\\{day_[0]}", dir) for dir in dirs]
    file_urls = [os.path.join(dir_url, file) for dir_url in dirs_urls for file in os.listdir(dir_url)]
    return file_urls

Compose the url of the raw json

In [8]:
urls = compose_url_to_raw_data(db_name, days, root_dir)

In [12]:
def from_urls_to_JSONDataSet(urls: list):
    JSONDataSets = []
    for url in urls:
        JSONDataSets.append(JSONDataSet(filepath=url))
    return JSONDataSets

In [13]:
def load_raw_data(urls: list):
    raw_data = {}
    JSONDataSets = from_urls_to_JSONDataSet(urls)
    
    for url, json in zip(urls, JSONDataSets):
        if "afternoon" in url:
            key = url[-41:].removesuffix(".json")
        elif "midday" in url:
            key = url[-38:].removesuffix(".json")
        elif "morning" in url:
            key = url[-39:].removesuffix(".json")
            
        raw_data[key] = json.load()
    
    return raw_data

Load the json file

In [13]:
raw_data = load_raw_data(urls)

In [14]:
raw_data

{}

In [6]:
def get_time_from_raw_data(raw_data: dict):
    times = {}
    for key, value in raw_data.items():
        times[key] = get_api_call_time(key)
    return times

In [7]:
def splitting_and_time_processing(raw_data: dict):
    time_processed_collections_documents = []
    api_call_times = get_time_from_raw_data(raw_data)
    
    for key, value in raw_data.items():
        dt = parser.parse(value["sourceUpdated"])
        
        for result in value["results"]:
            time_processed_collections_documents.append(result)
            time_processed_collections_documents[-1]["sourceUpdated"] = dt  
            time_processed_collections_documents[-1]["api_call_time"] = api_call_times[key] 
        
    return time_processed_collections_documents

In [17]:
def geometry_processing(input_data: list):
    geo_processed_data = []
    
    for doc in input_data:
        geo_processed_data.append(doc)
        # extract the links field from the input data
        raw_data_links = doc['location']['shape']['links']
        # create the geometry field (in order to comply the geojson format)
        geo_processed_data[-1]["geometry"] = GeometryCollection(
            [LineString([(e['lng'],e['lat']) for e in i['points']])for i in raw_data_links])
        # create the geometry_length field
        geo_processed_data[-1]["geometry_length"] = [i['length'] for i in raw_data_links]
        # bring embedded fields to the top level
        for k in geo_processed_data[-1]['location']:
            geo_processed_data[-1][k] = geo_processed_data[-1]['location'][k]
            
        # remove duplicated fields    
        geo_processed_data[-1].pop('location')
        geo_processed_data[-1].pop('shape')
        
    return geo_processed_data

In [18]:
def process_raw_data(urls: list):
    # load the raw data
    raw_data =  load_raw_data(urls)
    time_processed_collections_documents = splitting_and_time_processing(raw_data)
    processed_collections_documents = geometry_processing(time_processed_collections_documents)
    return processed_collections_documents

Process raw data 

In [20]:
processed_data = process_raw_data(urls)

In [21]:
processed_data

[{'currentFlow': {'speed': 13.611112,
   'speedUncapped': 13.611112,
   'freeFlow': 15.555556,
   'jamFactor': 1.1,
   'confidence': 0.87,
   'traversability': 'open'},
  'sourceUpdated': datetime.datetime(2023, 4, 20, 5, 28, 9, tzinfo=tzutc()),
  'api_call_time': datetime.datetime(2023, 4, 20, 7, 30, tzinfo=<DstTzInfo 'Europe/Oslo' CEST+2:00:00 DST>),
  'geometry': {"geometries": [{"coordinates": [[5.32776, 60.47125], [5.32768, 60.47128], [5.32753, 60.47133]], "type": "LineString"}, {"coordinates": [[5.32753, 60.47133], [5.32742, 60.47136], [5.32734, 60.47138], [5.32715, 60.47142]], "type": "LineString"}, {"coordinates": [[5.32715, 60.47142], [5.32684, 60.47152], [5.32663, 60.47159], [5.32643, 60.47166], [5.32624, 60.47173]], "type": "LineString"}, {"coordinates": [[5.32624, 60.47173], [5.32617, 60.47176]], "type": "LineString"}, {"coordinates": [[5.32617, 60.47176], [5.32603, 60.47182], [5.32586, 60.4719], [5.32567, 60.472]], "type": "LineString"}, {"coordinates": [[5.32567, 60.472],

In [28]:
def insert_raw_data(urls: list, db_name: str, day: str):
    # retrieve database and collections wrappers
    db, collections = retrieve_database_and_collections(db_name, day, ["raw", "processed"])
    empty_collections = take_empty_collections(collections)
    
    # load the raw data
    raw_data = load_raw_data(urls)
    
    # insert the documents in the collections
    for key, value in empty_collections.items():
        if "raw" in key:
            value.insert_many(list(raw_data.values()))
        else:
            pass

In [154]:
def insert_processed_data(processed_data: dict, db_name: str, day: str):
    # retrieve database and collections wrappers
    db, collections = retrieve_database_and_collections(db_name, day, ["raw", "processed"])
    empty_collections = take_empty_collections(collections)
    
    # insert the documents in the collections
    for key, value in empty_collections.items():
        if "raw" in key:
            pass
        elif "processed" in key:
            value.insert_many(list(processed_data))

Insert the data in the collections

In [155]:
insert_raw_data(urls, db_name, days)
insert_processed_data(processed_data, db_name, days)