# Introduction to Big Data Modern Technologies course

## TOPIC 1: Relational databases practice - explore Jupyter users' activities
### Part 1

### 1. Libraries

In [None]:
!pip install pandas

In [None]:
import os
import sys
import psycopg2
import datetime
import pandas as pd

pd.set_option('display.max_columns', None)

### 2. Raw data

In [None]:
!ls -la ~/__DATA/IBDT_Spring_2023/topic_1/

In [None]:
df = pd.read_csv('~/__DATA/IBDT_Spring_2023/topic_1/jhub_logs.csv', sep=';', index_col=False)
df.info()

In [None]:
df.head()

In [None]:
df.describe().T

### 3. Extract logs of the system

JupyterHub works within Kubernetes cluster, so we extact not only Jupyter logs, but Kubernetes logs too.

#### 3.1. Kubernetes logs

In [None]:
df.kubernetes[0]

In [None]:
def row_info(rin):
    """
    Extracts names of:
      - docker image
      - id of the Jupyter application
      - name of the host, where Jupyter runs
    
    """
    img = rin[rin.find('container_image='):].split('\'')[1]
    hub = rin[rin.find('pod_name='):].split('\'')[1]
    host = rin[rin.find('host='):].split('\'')[1]
    return img, hub, host

In [None]:
row_info(df.kubernetes[0])

#### 3.2. JupyterHub logs

In [None]:
df.log[0]

In [None]:
def sq_brackets(sin):
    """
    Split log string amd extracts:
      - timestamp of the event
      - name of application
      - type of logs
      - code of event
      - description
    
    """
    try:
        s = sin.split('[', 1)[1].split(']')[0]
        msg = sin[len(s) + 2 :].strip()
        s = s.split()
        head = s[0]
        ts = ' '.join(s[1:3])
        svc = s[3]
        typ = s[4].split(':')[0]
        code = s[4].split(':')[1]
    except:
        head, ts, svc, typ, code = '', '', '', '', ''
        msg = sin
    return head, ts, svc, typ, code, msg

In [None]:
sq_brackets(df.log[0])

#### 3.3. Make a whole dataset

##### 1st way

In [None]:
df_applied = df.apply(lambda x: row_info(x.kubernetes), axis='columns', result_type='expand')

In [None]:
df_applied.head()

In [None]:
df_new = pd.concat([df, df_applied], axis='columns')
df_new.head()

##### 2nd way (more pythonic)

In [None]:
df['img'], df['hub'], df['host'] = zip(*df['kubernetes'].map(row_info))
df.head()

In [None]:
df['head'], df['timestamp'], df['service'], \
    df['event_type'], df['event_code'], df['message'] \
    = zip(*df['log'].map(sq_brackets))
df.head()

In [None]:
df.message[0]

#### 3.4. Find users' activities (some magic and monkey job)

In [None]:
df.shape

In [None]:
df_user = df[df['message'].str.contains("vgarshin")]
df_user.shape

In [None]:
df_user.head()

In [None]:
df_user.message.values[0]

In [None]:
df[df['message'].str.contains("start")]

In [None]:
df.event_code.value_counts()

In [None]:
event_codes = df.event_code.unique()
event_codes

In [None]:
for ec in event_codes:
    print('code', ec)
    print(df[df.event_code == ec].message.values[0])
    try:
        print(df[df.event_code == ec].message.values[1])
    except:
        print('only one record')
    print('-' * 50)

In [None]:
def parce_users_activities(row):
    """
    Ugly function.
    
    You may use dictionary to make it
    more pythonic or something else.
    
    """
    code = row['event_code']
    msg = row['message']
    if code == '43':
        user = msg.split()[-1]
        log = 'logged out'
    elif code == '757':
        user = msg.split()[-1]
        log = 'logged in'
    elif code == '402':
        user = msg.split()[0]
        log = 'pending spawn'
    elif code == '1875':
        user = msg.split()[4].replace('claim-', '').replace(',', '')
        log = 'attempt to create pvc with timeout'
    elif code == '1887':
        user = msg.split()[1].replace('claim-', '')
        log = 'pvc already exists'
    elif code == '1840':
        user = msg.split()[4].replace('jupyter-', '').replace(',', '')
        log = 'attempting to create pod with timeout'
    elif code == '1344':
        user = msg.split('/')[3]
        log = 'failing suspected api request to not-running server'
    elif code == '380':
        user = msg.split()[3]
        log = 'previous spawn failed'
    elif code == '567':
        user = msg.split('/')[4]
        log = 'stream closed while handling '
    elif code == '681':
        user = msg.split()[0].replace('\'s', '')
        log = 'server failed to start'
    elif code == '1997':
        user = msg.split('-')[-1]
        log = 'deleting pod'
    elif code == '689':
        user = msg.split()[3].replace('\'s', '')
        log = 'unhandled error starting with timeout'
    elif code == '1961' or code == '2044':
        user = msg.split()[1].replace('jupyter-', '')
        log = 'restarting pod reflector'
    elif code == '257':
        user = msg.split()[2]
        log = 'adding user to proxy'
    elif code == '664':
        user = msg.split()[1]
        log = 'server is ready'
    elif code == '61' or code == '85':
        user = msg.split()[3]
        log = 'spawning sever with advanced configuration option'
    elif code == '1143':
        user = msg.split()[1].replace(':', '')
        log = 'server is slow to stop'
    elif code == '2077':
        user = msg.split()[0]
        log = 'still running'
    elif code == '167':
        user = msg.split()[1]
        log = 'server is already active'
    elif code == '1067' or code == '2022':
        user = msg.split()[1]
        log = 'user server stopped with exit code 1'
    elif code == '1857':
        user = msg.split()[3].replace('jupyter-', '').replace(',', '')
        log = 'found existing pod and attempting to kill'
    elif code == '1861':
        user = msg.split()[2].replace('jupyter-', '').replace(',', '')
        log = 'killed pod and will try starting singleuser pod again'
    elif code == '738':
        user = msg.split()[0].replace(',', '').replace('\'s', '')
        log = 'server never showed up and giving up'
    elif code == '2069':
        user = msg.split()[0].replace(',', '')
        log = 'user does not appear to be running and shutting it down'  
    elif code == '148':
        user = msg.split()[-1]
        log = 'user is running'
    elif code == '1415':
        user = msg.split()[-1]
        log = 'admin requesting spawn on behalf'
    elif code == '1437':
        user = msg.split()[5].replace(',', '')
        log = 'user requested server which user do not own'
    elif code == '626':
        user = msg.split()[1]
        log = 'server is already started'
    elif code == '2085':
        user = msg.split()[0]
        log = 'server appears to have stopped while the hub was down'
    else:
        user, log = '', ''
    return user, log

In [None]:
df['user'], df['log'] = zip(*df.apply(parce_users_activities, axis=1))
df.head()

In [None]:
df[df.user != ''].head()

In [None]:
df = df.loc[df.user != '', [
    'timestamp',
    'hub',
    'img',
    'host',
    'event_code',
    'event_type',
    'log',
    'user'
]].reset_index(drop=True)

df.head()

In [None]:
df.user.unique()

In [None]:
df.hub.unique()

In [None]:
df.host.unique()

### 4. Normalize data

In [None]:
df.info()

In [None]:
df.shape

#### 4.1. Users table

In [None]:
logins = df.user.unique()
print(len(logins))
logins

Use `names` library https://pypi.org/project/names/

In [None]:
!pip install names

In [None]:
import names

In [None]:
users = []
for login in logins:
    user = {}
    user['login'] = login
    user['name'] = names.get_full_name()
    user['email'] = login + '@gsom.spbu.ru'
    users.append(user)
users[0]

In [None]:
df_users = pd.DataFrame(users)
df_users.head()

In [None]:
df.shape

In [None]:
df_users.to_csv('users.csv', sep=';', encoding='utf-8', index=False)

#### 4.2. JupyterHub instances table

In [None]:
df_instances = df[[
    'hub',
    'img',
    'host'
]].reset_index(drop=True)
df_instances.head()

In [None]:
df_instances.shape

In [None]:
df_instances.drop_duplicates(inplace=True)
df_instances.reset_index(drop=True, inplace=True)
df_instances.shape

In [None]:
df_instances.head()

In [None]:
df_instances.to_csv('instances.csv', sep=';', encoding='utf-8', index=False)

#### 4.3. JupyterHub events table

### <font color='red'>HOME ASSIGNMENT</font>

In [None]:
# Your code will be here
# create a table `events` with columns `event_code`, `event_type`, `log`
# drop duplicates and save it to CSV file to import to database later 

#### 4.4. JupyterHub logs table

In [None]:
df.head()

In [None]:
# NOTE that it is an example
# you will need to keep only `event_code` column as a key
# and remove `event_type` and `log` columns
# for data normalization

df_logs = df[[
    'timestamp',
    'hub',
    'event_code',
    'event_type',   # to be removed
    'log',          # to be removed
    'user'
]].reset_index(drop=True)
df_logs.head()

In [None]:
df_logs.index.name = 'idx'
df_logs.to_csv('logs.csv', sep=';', encoding='utf-8', index=True)

### 5. Toy ETL pipeline

Will create a script like a true data engineers!

### 6. Home assignment

Your home assignment for this part is:
1. Take large file with data on logs `~/__DATA/IBDT_Spring_2023/topic_1/jhub_logs_large.csv`
2. Create a table for events (see `4.3. JupyterHub events table`) and save it as `csv` like we did with `users` and `instances` tables
3. Check your script for the pipeline