<a href="https://colab.research.google.com/github/patrickspencer/user_timestamp_query_exercise/blob/main/user_timestamp_query_exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data classes Without

In [475]:
import pandas as pd
from collections import defaultdict

def read_file(f):
    try:
        df = pd.read_csv(f, sep='\t', 
                         dtype = {'user': str, 'timestamp': int, 'query': str})
        # columns: user, timestamp, text
        return [(r[0], r[1], r[2]) for _, r in df.iterrows()]
    except FileNotFoundError:
        print(f"File {f} not found.")


def parse(inputs):
    # dict: user -> list of sessions
    sessions = defaultdict(list)

    for i in inputs:
        u, ts, t = i # user, timestamp, text
        q = (ts, t) # query
        s = sessions[u] # session list
        
        if s and ts - s[-1][-1][0] <= 3*60:
            s[-1].append(q)
        else:
            # s[-1][-1][0] always exists
            s.append([q])

    return sessions


In [476]:
# test read_file function
file_name = 'user_timestamp_query_data.tsv'
inputs = read_file(file_name)
inputs

[('u1', 123, 'q_u1_a'),
 ('u2', 1000, 'q_u2_a'),
 ('u2', 1001, 'q_u2_b'),
 ('u1', 124, 'q_u1_b'),
 ('u1', 400, 'q_u1_c'),
 ('u3', 2000, 'q_u3_a')]

In [None]:
# test parse function
inputs = [
    ('u1', 123, 'q_u1_a'), 
    ('u2', 1000, 'q_u2_a'), 
    ('u2', 1001, 'q_u2_b'), 
    ('u1', 124, 'q_u1_b'), 
    ('u1', 400, 'q_u1_c'), 
    ('u3', 2000, 'q_u3_a'),
    ('u3', 3000, 'q_u3_b')
]
parse(inputs)

defaultdict(list,
            {'u1': [[(123, 'q_u1_a'), (124, 'q_u1_b')], [(400, 'q_u1_c')]],
             'u2': [[(1000, 'q_u2_a'), (1001, 'q_u2_b')]],
             'u3': [[(2000, 'q_u3_a')], [(3000, 'q_u3_b')]]})

In [None]:
# putting it together
file_name = 'user_timestamp_query_data.tsv'
inputs = read_file(file_name)
parse(inputs)

defaultdict(list,
            {'u1': [[(123, 'q_u1_a'), (124, 'q_u1_b')], [(400, 'q_u1_c')]],
             'u2': [[(1000, 'q_u2_a'), (1001, 'q_u2_b')]],
             'u3': [[(2000, 'q_u3_a')]]})

# With classes

In [None]:
from collections import defaultdict
from dataclasses import dataclass, field
from typing import List, TypedDict

In [None]:
@dataclass
class Query:
    user: str
    ts: int
    text: str

@dataclass
class Session:
    queries: List[Query] = field(default_factory=lambda: [])

    @property
    def last_query(self):
        return self.queries[-1] if self.queries else None

    def add(self, query):
        self.queries.append(query)

@dataclass
class SessionList:
    sessions: List[Session] = field(default_factory=lambda: [])

    @property
    def last_session(self):
        # last session in session list
        return self.sessions[-1] if self.sessions else None
    
    @property
    def last_query(self):
        # last query in last session
        return self.last_session.last_query if self.last_session else None
    
    @property
    def last_ts(self):
        # last timestamp in last query
        return self.last_query.ts if self.last_query else None
    
    def add_new_session_from_query(self, query):
        """
        Wrap query object in session wrapper and add to sessions list
        """
        new_query_list = [query]
        new_session_list = Session(new_query_list)
        self.sessions.append(new_session_list)
    
    def add(self, query):
        if not self.sessions:
            self.add_new_session_from_query(query)
        elif self.last_ts:
            if query.ts - self.last_ts < 3*60:
                self.last_session.add(query)
            else:
                self.add_new_session_from_query(query)
        
@dataclass
class UserSessions(TypedDict):
    user: str
    sessions: SessionList

In [None]:
input = ('u1', 123, 'qA')
q = Query(*input)

In [None]:
s = SessionList()
s

SessionList(sessions=[])

In [None]:
s.add_new_session_from_query(q)

In [None]:
s

SessionList(sessions=[Session(queries=[Query(user='u1', ts=123, text='qA')])])

In [None]:
input = ('u1', 456, 'qB')
user, ts, text = input
q2 = Query(*input)
s.add_new_session_from_query(q2)

In [None]:
s

SessionList(sessions=[Session(queries=[Query(ts=123, text='qA')])])

In [None]:
input = ('u1', 456, 'qB')
q3 = Query(*input)
s.add(q3)
s

SessionList(sessions=[Session(queries=[Query(user='u1', ts=123, text='qA')]), Session(queries=[Query(user='u1', ts=456, text='qB'), Query(user='u1', ts=456, text='qB')])])

In [None]:
input = ('u1', 1000, 'qB')
q4 = Query(*input)
s.add(q4)
s

SessionList(sessions=[Session(queries=[Query(user='u1', ts=123, text='qA')]), Session(queries=[Query(user='u1', ts=456, text='qB'), Query(user='u1', ts=456, text='qB')]), Session(queries=[Query(user='u1', ts=1000, text='qB'), Query(user='u1', ts=1000, text='qB'), Query(user='u1', ts=1000, text='qB')])])

In [None]:
all_user_sessions = UserSessions()
inputs = [('u1', 123, 'qA'), ('u2', 1000, 'qB'), ('u1', 124, 'qC'), ('u1', 2000, 'qC')]

In [None]:
all_user_sessions = UserSessions()
inputs = [('u1', 123, 'qA'), ('u2', 1000, 'qB'), ('u1', 124, 'qC'), ('u1', 2000, 'qC')]
for input in inputs:
    user, ts, text = input
    q = Query(ts, text)
    if user not in all_user_sessions:
        all_user_sessions[user] = SessionList()
    all_user_sessions[user].add(q)
all_user_sessions

{'u1': SessionList(sessions=[Session(queries=[Query(ts=123, text='qA'), Query(ts=124, text='qC')]), Session(queries=[Query(ts=2000, text='qC')])]),
 'u2': SessionList(sessions=[Session(queries=[Query(ts=1000, text='qB')])])}