In [1]:
from datetime import datetime, timedelta
import numpy as np
from tqdm import tqdm
from pathlib import Path
import pandas as pd
import os

from collections import Counter

## Analyse data

In [2]:
df_sum = None
for df_path in tqdm(list(Path("../data/local/tables/").iterdir())[:1]):
    if df_sum is None: 
        df_sum = pd.read_csv(df_path, encoding='utf-8', low_memory=False)
    else: 
        df_sum.append(pd.read_csv(df_path, encoding='utf-8', low_memory=False))

100%|██████████| 1/1 [00:00<00:00,  1.42it/s]


In [5]:
file_path_list =list(Path("../data/local/tables/").iterdir())[:]

In [7]:
[p.name[9:-4] for p in file_path_list]

['01_OCT_2022',
 '02_OCT_2022',
 '03_OCT_2022',
 '04_OCT_2022',
 '05_OCT_2022',
 '05_SEP_2022',
 '06_OCT_2022',
 '06_SEP_2022',
 '07_OCT_2022',
 '07_SEP_2022',
 '08_OCT_2022',
 '08_SEP_2022',
 '09_SEP_2022',
 '10_SEP_2022',
 '11_SEP_2022',
 '12_SEP_2022',
 '13_SEP_2022',
 '14_SEP_2022',
 '15_SEP_2022',
 '16_SEP_2022',
 '17_OCT_2022',
 '17_SEP_2022',
 '18_SEP_2022',
 '19_SEP_2022',
 '20_SEP_2022',
 '21_SEP_2022',
 '22_SEP_2022',
 '23_SEP_2022',
 '24_SEP_2022',
 '25_SEP_2022',
 '26_SEP_2022',
 '27_SEP_2022',
 '28_SEP_2022',
 '29_SEP_2022',
 '30_SEP_2022']

In [4]:
# df_sum[df_sum.duplicated()]

In [10]:
from datetime import datetime, timedelta
import pytz

import json
import requests
import time

from PTETA.utils.transport.TransportOperator import TransportOperator
from PTETA.utils.transport.TransportRoute import TransportRoute
from PTETA.utils.transport.TransportVehicle import TransportVehicle
from PTETA.utils.transport.TransportAVLData import TransportAVLData
from psycopg2.extensions import connection as Connection
import psycopg2

from apscheduler.schedulers.background import BackgroundScheduler
from PTETA.utils.transport.BaseDBAccessDataclass import BaseDBAccessDataclass
from typing import List, Union
from PTETA.utils.transport.BaseDBAccessDataclass import BaseDBAccessDataclass

In [24]:
conn = psycopg2.connect(
    host=os.environ['RDS_HOSTNAME'],
    database="pteta_db",
    user="postgres",
    password=os.environ['RDS_PTETA_DB_PASSWORD'])

In [11]:
response_prev = dict()

try: 
    del TransGPSCVMonitor
except: 
    pass

class TransGPSCVMonitor:
    db_connection: Connection = None

    operator_to_id: dict = dict()
    route_to_id: dict = dict()
    vehicle_to_id: dict = dict()

    objects_unique = {
        TransportOperator: set(),
        TransportRoute: set(),
        TransportVehicle: set()
    }

    def __init__(self, connection_config: dict, **kwarg: dict) -> None:
        self.db_connection = psycopg2.connect(**connection_config)

        self.reload_operators()
        self.reload_routes()
        self.reload_vehicles()

        self.datetime_format = '%Y-%m-%d %H:%M:%S'

        self.REQUEST_URI = \
            kwarg.get('REQUEST_URI', 'http://www.trans-gps.cv.ua/map/tracker/?selectedRoutesStr=')
        self.START_DATE = \
            kwarg.get('START_DATE', (datetime.now() - timedelta(days=1)).strftime(self.datetime_format))
        self.END_DATE = \
            kwarg.get('END_DATE', (datetime.now() + timedelta(days=30)).strftime(self.datetime_format))
        self.REQ_TIME_DELTA = kwarg.get('REQ_TIME_DELTA', 1.1)

    def reload_operators(self):
        operator_list = TransportOperator.get_table(self.db_connection)
        self.objects_unique[TransportOperator] = set(operator_list)
        self.operator_to_id = dict({operator: operator.id for operator in operator_list})

    def reload_routes(self):
        route_list = TransportRoute.get_table(self.db_connection)
        self.objects_unique[TransportRoute] = set(route_list)
        self.route_to_id = dict({route: route.id for route in route_list})

    def reload_vehicles(self):
        vehicle_list = TransportVehicle.get_table(self.db_connection)
        self.objects_unique[TransportVehicle] = set(vehicle_list)
        self.vehicle_to_id = dict({vehicle: vehicle.id for vehicle in vehicle_list})

    @classmethod
    def request_data(cls, request_uri='http://www.trans-gps.cv.ua/map/tracker/?selectedRoutesStr='):
        dt_now = datetime.now()
        dt_tz_now = datetime.utcnow().replace(tzinfo=pytz.utc)

        try:
            request = requests.get(request_uri)
            if (request is None) or (request.text is None):
                return
            response_cur = json.loads(request.text)

            global response_prev

            keys_prev = set(response_prev.keys())
            keys_cur = set(response_cur.keys())

            optimized_data_list = list()
            for imei in keys_prev.intersection(keys_cur):
                if response_prev[imei]['gpstime'] != response_cur[imei]['gpstime']:
                    response_cur[imei]['response_datetime'] = dt_tz_now
                    optimized_data_list += [response_cur[imei]]

            for imei in keys_cur.difference(keys_prev):
                response_cur[imei]['response_datetime'] = dt_tz_now
                optimized_data_list += [response_cur[imei]]

            response_prev = response_cur
        except (requests.Timeout, requests.ConnectionError, requests.HTTPError) as err:
            print(f"{dt_now.strftime('%Y-%m-%d %H;%M;%S')} : error while trying to GET data\n"
                  f"\t{err}\n")

        #         print(dt_now.strftime('%Y-%m-%d %H;%M;%S'), len(optimized_data_list))
        return optimized_data_list

    def get_new_objs(self, obj_list: List[BaseDBAccessDataclass]) -> List[BaseDBAccessDataclass]:
        unique_obj_list = list(set(obj_list))
        return [obj for obj in unique_obj_list
                if obj not in self.objects_unique[obj.__class__]]

    def update_db(self, obj_list: List[BaseDBAccessDataclass]):
        current_class = obj_list[0].__class__
        are_in_db_list = current_class.are_in_table(self.db_connection, obj_list)

        obj_to_insert = [obj
                         for obj, is_in in zip(obj_list, are_in_db_list)
                         if not is_in]

        current_class.insert_many_in_table(self.db_connection, obj_to_insert)

    @classmethod
    def decompose_response(cls, response: List[dict]) -> Union:
        operator_list, route_list = list(), list()
        vehicle_list, avl_data_list = list(), list()

        for row in response:
            operator_list.append(TransportOperator.from_response_row(row))
            route_list.append(TransportRoute.from_response_row(row))
            vehicle_list.append(TransportVehicle.from_response_row(row))
            avl_data_list.append(TransportAVLData.from_response_row(row))

        return operator_list, route_list, vehicle_list, avl_data_list

    def write_to_db(self, response):
        operator_list, route_list, vehicle_list, avl_data_list = self.decompose_response(response)

        for obj_list in [operator_list, route_list, vehicle_list]:
            new_obj = self.get_new_objs(obj_list)
            if new_obj:
                print(f"There are {len(new_obj)} new {new_obj[0].__class__} to inserted in DB")
                self.update_db(new_obj)
                if isinstance(new_obj[0], TransportOperator):
                    self.reload_operators()
                elif isinstance(new_obj[0], TransportRoute):
                    self.reload_routes()
                elif isinstance(new_obj[0], TransportVehicle):
                    self.reload_vehicles()

        for i, (vehicle, route) in enumerate(zip(vehicle_list, route_list)):
            avl_data_list[i].vehicle_id = self.vehicle_to_id[vehicle]
            avl_data_list[i].route_id = self.route_to_id[route]
            

        TransportAVLData.insert_many_in_table(self.db_connection, avl_data_list)

    def run(self):
        scheduler = BackgroundScheduler(job_defaults={'max_instances': 8})
        scheduler.add_job(
            self.request_data,
            'interval',
            seconds=self.REQ_TIME_DELTA,
            end_date=self.END_DATE,
            id='listener')

        scheduler.start()

        try:
            print('Scheduler started!')
            while 1:
                time.sleep(10)
                print(datetime.now())
        except KeyboardInterrupt:
            if scheduler.state:
                scheduler.shutdown()

In [12]:
connection_config= dict({
    'host': os.environ['RDS_HOSTNAME'],
    'database': "pteta_db",
    'user': "postgres",
    'password': os.environ['RDS_PTETA_DB_PASSWORD']
})

monitor = TransGPSCVMonitor(connection_config)

req = monitor.request_data()
len(req)

87

In [15]:
batch_size = 10
for batch_number, batch_df in df_sum.groupby(np.arange(len(df_sum)) // batch_size):
#     result.append(batch_df['ID'].nunique())
    print(batch_number)
    print(batch_df)
    if batch_number > 2: 
        break

0
    id             imei  name stateCode stateName        lat        lng  \
0  371  355227046451266   H53      used      used  48.265122  25.989670   
1  371  355227046451266   H53      used      used  48.265010  25.989862   
2  371  355227046451266   H53      used      used  48.264903  25.990107   
3  243  355227045600988  A158      used      used  48.266835  25.992173   
4  371  355227046451266   H53      used      used  48.264917  25.990410   
5  383  355227046447371   H65      used      used  48.265955  25.991667   
6  371  355227046451266   H53      used      used  48.265127  25.990710   
7  371  355227046451266   H53      used      used  48.265295  25.990962   
8  386  355227046451407   H68      used      used  48.265908  25.989527   
9  371  355227046451266   H53      used      used  48.265415  25.991173   

   speed  orientation              gpstime  routeId routeName routeColour  \
0    5.3       125.83  2022-09-30 23:00:05       37         T       coral   
1    7.4       126

## TransportRoute

In [7]:
TransportRoute.__insert_columns__().replace('"', '').split(', ')[1:]

['route_name', 'route_colour']

In [28]:
cols = ["routeId", 'routeName', 'routeColour']
route_list = [TransportRoute.from_response_row(row) for row in df_sum[cols].drop_duplicates().to_dict('records')]
len(route_list), route_list[:3]
TransportRoute.insert_many_in_table(conn, route_list)

In [32]:
TransportRoute.get_table(conn)

[TransportRoute(id=37, name='T', colour='coral'),
 TransportRoute(id=31, name='6/6a', colour='deeppink'),
 TransportRoute(id=21, name='38', colour='deeppink'),
 TransportRoute(id=41, name='10A', colour='black'),
 TransportRoute(id=20, name='A', colour='navy'),
 TransportRoute(id=42, name='39', colour='coral'),
 TransportRoute(id=23, name='19', colour='teal'),
 TransportRoute(id=19, name='10', colour='black'),
 TransportRoute(id=11, name='9', colour='magenta'),
 TransportRoute(id=2, name='11', colour='green'),
 TransportRoute(id=6, name='4', colour='magenta'),
 TransportRoute(id=27, name='3/3a', colour='green'),
 TransportRoute(id=16, name='2', colour='green'),
 TransportRoute(id=4, name='5', colour='orange'),
 TransportRoute(id=3, name='12', colour='blue'),
 TransportRoute(id=12, name='20', colour='maroon'),
 TransportRoute(id=7, name='6', colour='sienna'),
 TransportRoute(id=10, name='34', colour='navy'),
 TransportRoute(id=45, name='1', colour='navy'),
 TransportRoute(id=9, name='27'

In [13]:
# df_sum[TransportRoute.__insert_columns__().replace('"', '').split(', ')[1:]].value_counts()

In [27]:
conn.rollback()

In [45]:
SQL_big_req = " ".join([create_sql_req(t) for t in trans_vehicle_list])

In [29]:
route_list = TransportRoute.get_table(conn)[:]
len(route_list)

31

In [30]:
for r in route_list[::2]: 
    r.id += 1
TransportRoute.are_in_table(conn, route_list)

[False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False]

In [31]:
TransportRoute.are_in_table(conn, route_list)

[False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False,
 True,
 False]

In [14]:
for r in route_list: 
    r.name += "_"
    r.id += 100

In [16]:
# sql = f"""INSERT INTO pteta.route("id", "routeName", "routeColour") VALUES """ + \
#               ", ".join([f"""({obj.id}, '{obj.name}', '{obj.colour}')"""
#                         for obj in route_list]) + ";"
# sql

In [17]:
# route_list

In [18]:
TransportRoute.insert_many_in_table(conn, route_list)

## Test TransportVehicle

In [33]:
TransportVehicle.__insert_columns__()

'"imei", "name", "bus_number", "remark", "perev_id"'

In [45]:
cols = ["imei", "name", "busNumber", "remark", "perevId"]
vehicle_list = [TransportVehicle.from_response_row(row) for row in df_sum[cols].drop_duplicates().to_dict('records')]
len(vehicle_list), vehicle_list[:3]
TransportVehicle.insert_many_in_table(conn, vehicle_list)

In [47]:
vehicle_list = TransportVehicle.get_table(conn)
len(vehicle_list)

99

In [48]:
TransportVehicle.are_in_table(conn, vehicle_list[:5])

[True, True, True, True, True]

In [49]:
vehicle_list[5].is_in_table(conn)

True

In [50]:
obj = vehicle_list[0]
obj.imei += "-"
obj.is_in_table(conn)

False

In [51]:
obj.insert_in_table(conn)

In [52]:
for v in vehicle_list[:10]: 
    v.imei += '_'

In [15]:
TransportVehicle.insert_many_in_table(conn, vehicle_list[:10])

In [31]:
obj.insert_in_table(conn)
obj.is_in_table(conn)

In [14]:
vehicle_list[0].id = None
print(vehicle_list[0])
vehicle_list[0].update_id_from_table(conn)
print(vehicle_list[0])

TransportVehicle(id=None, imei='355227045600830', name='A178', busNumber='310', remark='Тролейбус 310 DNSNK', perevId=6, routeId=37)
TransportVehicle(id=1, imei='355227045600830', name='A178', busNumber='310', remark='Тролейбус 310 DNSNK', perevId=6, routeId=37)


In [34]:
for v in vehicle_list[:10]: 
    v.imei += '__'

vehicle_list[:13]

[TransportVehicle(id=222, imei='355227045600830-__', name='A178', busNumber='310', remark='Тролейбус 310 DNSNK', perevId=6, routeId=37),
 TransportVehicle(id=2, imei='355227046451662__', name='H76', busNumber='350', remark='Тролейбус 350 DNSNTNK', perevId=6, routeId=31),
 TransportVehicle(id=3, imei='355227045369527__', name='A6', busNumber='3557', remark='3557 DNSNK ', perevId=12, routeId=21),
 TransportVehicle(id=4, imei='355227045540176__', name='A83', busNumber='5150', remark=' 5150 DNS', perevId=7, routeId=41),
 TransportVehicle(id=5, imei='355227046453387__', name='H75', busNumber='3627', remark='3627 DNSNTNK', perevId=1, routeId=20),
 TransportVehicle(id=6, imei='355228042084283__', name='A207', busNumber='1032', remark='1032 DNSNK', perevId=13, routeId=42),
 TransportVehicle(id=7, imei='355227045371655__', name='A1', busNumber='6513', remark='6513 DNSNK', perevId=7, routeId=23),
 TransportVehicle(id=8, imei='355227046451407__', name='H68', busNumber='0855', remark='0855 DNSNTNK

## TransportOperator

In [41]:
operator_list = TransportOperator.get_table(conn)
len(operator_list)

6

In [37]:
TransportOperator.__insert_columns__()

'id, "perev_name"'

In [40]:
cols = ['perevId', "perevName"]
operator_list = [TransportOperator.from_response_row(row) 
              for row in df_sum[cols].drop_duplicates().to_dict('records')]
len(operator_list), operator_list[:3]
TransportOperator.insert_many_in_table(conn, operator_list)

In [42]:
TransportOperator.are_in_table(conn, operator_list[:])

[True, True, True, True, True, True]

In [43]:
operator_list[0].is_in_table(conn)

True

In [44]:
obj = operator_list[0]
obj.id += 100
obj.name = "-_-" + obj.name
obj.is_in_table(conn)

False

In [14]:
obj.insert_in_table(conn)

In [15]:
for op in operator_list: 
    op.id += 100
    op.name = "-_-" + op.name
    
TransportOperator.insert_many_in_table(conn, operator_list)

## TransportAVLData

In [9]:
avl_data_list = TransportAVLData.get_table(conn)

In [11]:
# avl_data_list

In [14]:
%%timeit
TransportAVLData.are_in_table(conn, avl_data_list)

57.4 ms ± 3.36 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [16]:
%%timeit
avl_data_list[0].is_in_table(conn)

42.1 ms ± 1.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
