<img src="images/logo_city.png" align="right" width="20%">

# Data Generation Module Development

## 1. Goal

The module should be something like below:

In [None]:
import timeslice                    
import timeslice.source as source   
import timeslice.rule  as rule      
import timeslice.worker as worker
import timeslice.viz as viz # to be done

import torch

In [None]:
# set time split rule for dataset generation
time_rule = rule.TimeSlice(stp='2017-05-01 00:00:00', etp='2017-05-02 00:00:00', freq='10min')

# connect to database source
taxi_db = source.DatabaseSource('cleaned_small_yellow_2017_full', time_rule)

# initialize worker
data_worker = worker.Worker(source=taxi_db, destin='data_test/monthly_data/may', rule=time_rule, viz=True)

# generate dataset
# data_worker.generate()

In [None]:
import pandas as pd
pd.date_range('2017-05-01 00:00:00', '2017-06-01 00:00:00', freq='D')

In [1]:
import re
import pandas as pd
import pprint
import psycopg2

def _construct_sql(stp:str, etp:str):
        '''
        A private helper function to construct sql query, called another
        helper function _construct_split.

        Args:
            stp: datetime string, starting time point of a concurrent unit
            etp: datatime string, end time point of a concurrent unit

        Returns:
            sql: a constructed query string
        '''
        pattern = re.compile(
            '^([0-9]{4})-([0-1][0-9])-([0-3][0-9])\s([0-1][0-9]|[2][0-3]):([0-5][0-9]):([0-5][0-9])$'
        )
        assert pattern.match(stp) and pattern.match(etp)

        return (f"select tripid,tpep_pickup_datetime,tpep_dropoff_datetime,pulocationid,dolocationid from cleaned_small_yellow_2017_full "
                f"where tpep_pickup_datetime >= '{stp}' and tpep_dropoff_datetime < '{etp}';")
    


bounds = pd.date_range('2017-05-01 00:00:00', '2017-06-01 00:00:00', freq='D')
subs = list(zip(bounds[:-1], bounds[1:]))
queries = {}

host = 'localhost'
dbname = 'taxi'
user = 'postgres'

conn = psycopg2.connect(f'host={host} dbname={dbname} user={user}')
cursor = conn.cursor()

# create sub interval queries
for i, sub in enumerate(subs):
    stp, etp = list(map(str, sub))
    queries[i] = _construct_sql(stp, etp)

dataframes = {}
        
for i, query in queries.items():
    # dataframes[i] = pd.read_sql_query(query, conn)
    cursor.execute(query)

In [2]:
import re
import pandas as pd
import pprint
import psycopg2
from threading import Thread
from multiprocessing import Process
from queue import Queue

def _construct_sql(stp:str, etp:str):
        '''
        A private helper function to construct sql query, called another
        helper function _construct_split.

        Args:
            stp: datetime string, starting time point of a concurrent unit
            etp: datatime string, end time point of a concurrent unit

        Returns:
            sql: a constructed query string
        '''
        pattern = re.compile(
            '^([0-9]{4})-([0-1][0-9])-([0-3][0-9])\s([0-1][0-9]|[2][0-3]):([0-5][0-9]):([0-5][0-9])$'
        )
        assert pattern.match(stp) and pattern.match(etp)

        return (f"select tripid,tpep_pickup_datetime,tpep_dropoff_datetime,pulocationid,dolocationid from cleaned_small_yellow_2017_full "
                f"where tpep_pickup_datetime >= '{stp}' and tpep_dropoff_datetime < '{etp}';")
    


def concurrent_read(id:int, df_pool:dict, query:str, conn):
    cursor = conn.cursor()
    cursor.execute(query)
    
    
bounds = pd.date_range('2017-05-01 00:00:00', '2017-06-01 00:00:00', freq='D')
subs = list(zip(bounds[:-1], bounds[1:]))
queries = {}

host = 'localhost'
dbname = 'taxi'
user = 'postgres'
thread_pool = Queue()
df_pool = Queue()

conn = psycopg2.connect(f'host={host} dbname={dbname} user={user}')
# create sub interval queries
for i, sub in enumerate(subs):
    stp, etp = list(map(str, sub))
    queries[i] = _construct_sql(stp, etp)


for i, query in queries.items():
    t = Thread(target=concurrent_read, args=(i, df_pool, query, conn))
    thread_pool.put(t)
    t.start()


while not thread_pool.empty():
    p = thread_pool.get()
    p.join()

In [15]:
# define function to return weekly-sliced time intervals
def weekly_divide(stp:str, etp:str):
    
    bounds = pd.date_range(stp, etp, freq='1W-MON')
    print(bounds[0], bounds[-1])
    head_round = tail_round = None
    if bounds[0] != stp:
        head_round = (pd.Timestamp(stp), pd.Timestamp(bounds[0]))
    if bounds[-1] != etp:
        tail_round = (pd.Timestamp(bounds[-1]), pd.Timestamp(etp))
    print('\n\n\n')
    subs = [head_round] + list(zip(bounds[:-1], bounds[1:])) + [tail_round]
    
    return subs

In [16]:
weekly_divide('2017-01-01 00:00:00', '2017-06-01 00:00:00')

2017-01-02 00:00:00 2017-05-29 00:00:00


NameError: name 'Timestamp' is not defined

In [12]:
pd.Timestamp('2017-01-01 00:00:00', freq='W-MON')

Timestamp('2017-01-01 00:00:00', freq='W-MON')

In [14]:
[1] + [2,3,4]

[1, 2, 3, 4]