In [None]:
import requests
import json
import cv2
from PIL import Image
import datetime
import os
import os.path as osp
import shutil
import torch
import numpy as np

In [None]:
date = "2022-01-25"
time = "11:59:46"
lat, lon = "1.29531332", "103.871146"

### Getting Historical data using input date, time, location

In [None]:
def get_historical_images(date, time, lat, lon):
        images = {}
        date_obj = datetime.datetime.strptime(date+'T'+time,"%Y-%m-%dT%H:%M:%S")
        prev_days = [datetime.datetime.strftime(date_obj - datetime.timedelta(days=x),"%Y-%m-%dT%H:%M:%S") for x in range(1,8)]
        prev_days.append(datetime.datetime.strftime(date_obj - datetime.timedelta(minutes=15),"%Y-%m-%dT%H:%M:%S"))
        prev_days.append(datetime.datetime.strftime(date_obj - datetime.timedelta(minutes=30),"%Y-%m-%dT%H:%M:%S"))
        
        for prev_day in prev_days:
            try:
                response = requests.get(f"https://api.data.gov.sg/v1/transport/traffic-images?date_time={prev_day}")
                response_body = response.json()
                items = response_body["items"]
                if not (response_body.get("items",[]) and response_body["items"][0].get("cameras",[])):
                    print(f"No data for - {prev_day}")
                    continue
                cameras_list = response_body["items"][0]["cameras"]
                for camera_data in cameras_list:
                    location = camera_data["location"]
                    if lat == str(location['latitude']) and lon == str(location['longitude']):
                        images[prev_day] = camera_data['image']
                        break
            except Exception as e:
                print(f"Error while getting data for day - {prev_day} - {str(e)}")
                continue
        return images

def get_historical_weather_data(date, time, lat, lon):
        data = {}
        date_obj = datetime.datetime.strptime(date,"%Y-%m-%d")
        prev_days = [datetime.datetime.strftime(date_obj - datetime.timedelta(x),"%Y-%m-%d") for x in range(1,8)]
        nearest_station = ""
        minimum_distance = 180
        for prev_day in prev_days:
            try:
                response = requests.get(f"https://api.data.gov.sg/v1/environment/rainfall?date_time={prev_day}T{time}")
                response_body = response.json()
                items = response_body["items"]
                stations = response_body["metadata"]["stations"]
                if nearest_station == "":
                    for station in stations:
                        tem = abs(float(lat) - station["location"]["latitude"]) + abs(float(lon) - station["location"]["longitude"])
                        if tem < minimum_distance:
                            minimum_distance = tem
                            nearest_station = station["id"]
                    if minimum_distance == 180:
                        print(f"No data for - {prev_day}")
                        continue
                for rain_fall in items[0]["readings"]:
                    if rain_fall["station_id"] == nearest_station:
                        data[prev_day]=rain_fall["value"]
                        break
            except Exception as e:
                print(f"Error while getting data for day - {prev_day} - {str(e)}")
                continue
        return data
    
# get_historical_weather_data(date, time, lat, lon)
# get_historical_images(date, time, lat, lon)

## Detect number of cars in images - using Cascade RCNN object detection model

### Trained Model Link:

https://drive.google.com/file/d/18E3I8-GeiXXczpwBVRG7vTfVo-pi7aDq/view?usp=sharing

In [None]:
from mmdet.apis import init_detector, inference_detector
import mmcv

def detect_vehicles(model, image_url):
    total_vehicles = 0
    temp_image_path = "temp_file.jpg"
    try:
        image_response = requests.get(image_url,stream=True)
        with open(temp_image_path,'wb') as fh:
            image_response.raw.decode_content = True
            shutil.copyfileobj(image_response.raw, fh)
        result = inference_detector(model, temp_image_path)
        os.remove(temp_image_path)
        for classes in list(result):
            if list(classes):
                total_vehicles += len(list(classes))
        return total_vehicles
    except Exception as e:
        print(e)
        return -1

config_file = 'cnn_model/cascade_rcnn_r50_fpn_20e_coco.py'
checkpoint_file = 'cnn_model/cascade_rcnn_r50_fpn_20e_coco.pth'
model = init_detector(config_file, checkpoint_file, device='cuda:0')
images = get_historical_images(date, time, lat, lon)
vehicles_count = [detect_vehicles(model,images[x]) for x in images]

## RunRNN Model to predict car count from last 7 days data (car count)

In [None]:
from rnn_model.lstm_model import LSTM

model = LSTM(input_dim=1,hidden_dim=4,num_layers=32,output_dim=1)
model.load_state_dict(torch.load('rnn_model/rnn_model.pth'))
model.eval()
inputs = np.asarray([vehicles_count],dtype=np.float32)
inputs = np.expand_dims(inputs,2)
car_prediction = model(torch.from_numpy(inputs))

In [None]:
print(list(car_prediction))