# Introduction to Big Data Modern Technologies course

## FINAL PROJECT: lab work
### Part 1. Object storage and database pipeline (Spark)

### 1. Libraries and credentials

In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import io
import sys
import json
import boto3
import logging
import psycopg2
import requests
import datetime
import multiprocessing
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf, struct, countDistinct

In [3]:
def access_data(file_path):
    with open(file_path) as file:
        access_data = json.load(file)
    return access_data

creds = access_data(file_path='Selezneva_access_bucket.json')
print(creds.keys())

dict_keys(['aws_access_key_id', 'aws_secret_access_key'])


### 2. Browse files at S3

In [4]:
session = boto3.session.Session()
s3 = session.client(
    service_name='s3',
    aws_access_key_id=creds['aws_access_key_id'],
    aws_secret_access_key=creds['aws_secret_access_key'],
    endpoint_url='https://storage.yandexcloud.net'
)

In [5]:
DATA_BUCKET = 'shelkoviydivan'

In [6]:
all_files = [key['Key'] for key in s3.list_objects(Bucket=DATA_BUCKET)['Contents']]
print('files in storage:', all_files[:10]) # works only for num of files < 1000

files in storage: ['DataCoSupplyChainDataset.csv', 'DescriptionDataCoSupplyChain.csv', 'jhub_logs_large.csv', 'tokenized_access_logs.csv']


### 3. Connection to database

##### <font color='blue'>ClickHouse</font>

In [7]:
PATH_TO_JAR = '/home/jovyan/__DATA/IBDT_Spring_2023/topics_labs/jars/clickhouse-jdbc-0.4.0-shaded.jar'
CLICKHOUSE_JAR = f'file://{PATH_TO_JAR}'

In [8]:
def execute_query(query, access_ch, data=None):
    url = 'https://{host}:{port}/'.format(
        host=access_ch['host'],
        port=access_ch['port']
    )
    params = {
        'database': access_ch['dbname'],
        'query': query.strip()
    }
    auth = {
        'X-ClickHouse-User': access_ch['user'],
        'X-ClickHouse-Key': access_ch['password']
    }
    rs = requests.post(
        url, 
        params=params, 
        headers=auth, 
        data=data,
        verify=f'/home/jovyan/__DATA/IBDT_Spring_2023/topics_labs/{access_ch["sslrootcert"]}'
    )
    return rs

In [9]:
access_ch = access_data('access_ch.json')
print(access_ch.keys())

dict_keys(['host', 'port', 'dbname', 'user', 'password', 'sslrootcert'])


In [10]:
access_ch.values()

dict_values(['rc1a-khd773h8uvvb9sug.mdb.yandexcloud.net', 8443, 'db1', 'user1', 'pasword1', 'YandexInternalRootCA.crt'])

In [11]:
query = '''
SELECT version()
'''
rs = execute_query(query, access_ch)
rs.text

'23.3.1.2823\n'

##### <font color='green'>PostgreSQL</font>

In [12]:
access_postgres = access_data('access_postgres.json')
print(access_postgres.keys())

dict_keys(['host', 'port', 'dbname', 'user', 'password', 'sslrootcert'])


In [13]:
#access_postgres

In [14]:
def send_query(query, access_postgres, res=False):
    result = None
    with psycopg2.connect(
        host=access_postgres['host'],
        port=access_postgres['port'],
        dbname=access_postgres['dbname'],
        user=access_postgres['user'],
        password=access_postgres['password'],
        target_session_attrs='read-write',
        sslmode='verify-full',
        sslrootcert=f'/home/jovyan/__DATA/IBDT_Spring_2023/topics_labs/{access_postgres["sslrootcert"]}'
        ) as conn:
            with conn.cursor() as cur:
                cur.execute(query)
                if res:
                    result = cur.fetchall()
    return result

In [15]:
query = '''
SELECT version()
'''
send_query(query, access_postgres, res=True)

[('PostgreSQL 14.7 (Ubuntu 14.7-201-yandex.52766.d65626c879) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 7.5.0-3ubuntu1~18.04) 7.5.0, 64-bit',)]

### 4. Data preprocessing with Spark

In [16]:
# web UI for the Spark

def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return '{}proxy/{}/jobs/'.format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

SparkContext.uiWebUrl = property(uiWebUrl)

# Spark settings
conf = SparkConf()
conf.set('spark.master', 'local[*]')    # max 5 cores available
conf.set('spark.driver.memory', '12G')  # max 16 GB available
conf.set('spark.driver.extraClassPath', CLICKHOUSE_JAR) # CH connect
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# Spark's access for object storage settings
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', creds['aws_access_key_id'])
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', creds['aws_secret_access_key'])
spark._jsc.hadoopConfiguration().set('fs.s3a.impl','org.apache.hadoop.fs.s3a.S3AFileSystem')
spark._jsc.hadoopConfiguration().set('fs.s3a.multipart.size', '104857600')
spark._jsc.hadoopConfiguration().set('fs.s3a.block.size', '33554432')
spark._jsc.hadoopConfiguration().set('fs.s3a.threads.max', '256')
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'http://storage.yandexcloud.net')

spark

23/04/13 11:35:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### 4.1. Read data with Spark

In [17]:
sdf = spark.read.csv(
    f's3a://{DATA_BUCKET}/jhub_logs_large.csv',
    sep=';', 
    header=True,
    multiLine=True, # if you have `\n` symbols
    escape="\""
)
sdf.printSchema()

23/04/13 11:35:35 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


root
 |-- date: string (nullable = true)
 |-- kubernetes: string (nullable = true)
 |-- log: string (nullable = true)
 |-- stream: string (nullable = true)
 |-- time: string (nullable = true)



In [18]:
sdf.show(5)

+--------------------+--------------------+--------------------+------+--------------------+
|                date|          kubernetes|                 log|stream|                time|
+--------------------+--------------------+--------------------+------+--------------------+
|2022-12-09T04:50:...|Row(annotations=R...|[I 2022-12-09 04:...|stderr|2022-12-09T04:50:...|
|2022-12-09T04:50:...|Row(annotations=R...|[W 2022-12-09 04:...|stderr|2022-12-09T04:50:...|
|2022-12-09T04:50:...|Row(annotations=R...|[I 2022-12-09 04:...|stderr|2022-12-09T04:50:...|
|2022-12-09T04:50:...|Row(annotations=R...|[W 2022-12-09 04:...|stderr|2022-12-09T04:50:...|
|2022-12-09T04:50:...|Row(annotations=R...|[I 2022-12-09 04:...|stderr|2022-12-09T04:50:...|
+--------------------+--------------------+--------------------+------+--------------------+
only showing top 5 rows



#### 4.2. Kubernetes logs

In [19]:
def row_info(rin):
    """
    Extracts names of:
      - docker image
      - id of the Jupyter application
      - name of the host, where Jupyter runs
    
    """
    img = rin[rin.find('container_image='):].split('\'')[1]
    hub = rin[rin.find('pod_name='):].split('\'')[1]
    host = rin[rin.find('host='):].split('\'')[1]
    return img, hub, host

In [20]:
udf_row_info = udf(row_info, ArrayType(StringType()))

In [21]:
sdf = sdf.withColumn('kubernetes_msg', udf_row_info('kubernetes'))
sdf.limit(5).toPandas()

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
                                                                                

Unnamed: 0,date,kubernetes,log,stream,time,kubernetes_msg
0,2022-12-09T04:50:48.335844Z,Row(annotations=Row(checksum/config-map='ce892...,[I 2022-12-09 04:50:48.335 JupyterHub log:181]...,stderr,2022-12-09T04:50:48.33584421Z,"[jupyterhub/k8s-hub:0.11.1, hub-5c66c6c96f-p5x..."
1,2022-12-09T04:50:48.359937Z,Row(annotations=Row(checksum/config-map='ce892...,[W 2022-12-09 04:50:48.359 JupyterHub log:181]...,stderr,2022-12-09T04:50:48.359937031Z,"[jupyterhub/k8s-hub:0.11.1, hub-5c66c6c96f-p5x..."
2,2022-12-09T04:50:55.940651Z,Row(annotations=Row(checksum/config-map='ce892...,[I 2022-12-09 04:50:55.940 JupyterHub log:181]...,stderr,2022-12-09T04:50:55.940651688Z,"[jupyterhub/k8s-hub:0.11.1, hub-5c66c6c96f-p5x..."
3,2022-12-09T04:50:55.968410Z,Row(annotations=Row(checksum/config-map='ce892...,[W 2022-12-09 04:50:55.968 JupyterHub log:181]...,stderr,2022-12-09T04:50:55.968410334Z,"[jupyterhub/k8s-hub:0.11.1, hub-5c66c6c96f-p5x..."
4,2022-12-09T04:50:51.758320Z,Row(annotations=Row(checksum/config-map='ce892...,[I 2022-12-09 04:50:51.758 JupyterHub log:181]...,stderr,2022-12-09T04:50:51.758320284Z,"[jupyterhub/k8s-hub:0.11.1, hub-5c66c6c96f-p5x..."


In [22]:
sdf = sdf.select(
    'date',
    F.col('kubernetes_msg')[0].alias('img'),
    F.col('kubernetes_msg')[1].alias('hub'),
    F.col('kubernetes_msg')[2].alias('host'),
    'log',
    'stream',
    'time'
)
sdf.limit(5).toPandas()

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


Unnamed: 0,date,img,hub,host,log,stream,time
0,2022-12-09T04:50:48.335844Z,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,[I 2022-12-09 04:50:48.335 JupyterHub log:181]...,stderr,2022-12-09T04:50:48.33584421Z
1,2022-12-09T04:50:48.359937Z,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,[W 2022-12-09 04:50:48.359 JupyterHub log:181]...,stderr,2022-12-09T04:50:48.359937031Z
2,2022-12-09T04:50:55.940651Z,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,[I 2022-12-09 04:50:55.940 JupyterHub log:181]...,stderr,2022-12-09T04:50:55.940651688Z
3,2022-12-09T04:50:55.968410Z,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,[W 2022-12-09 04:50:55.968 JupyterHub log:181]...,stderr,2022-12-09T04:50:55.968410334Z
4,2022-12-09T04:50:51.758320Z,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,[I 2022-12-09 04:50:51.758 JupyterHub log:181]...,stderr,2022-12-09T04:50:51.758320284Z


#### 4.3. JupyterHub logs

In [23]:
def sq_brackets(sin):
    """
    Split log string amd extracts:
      - timestamp of the event
      - name of application
      - type of logs
      - code of event
      - description
    
    """
    try:
        s = sin.split('[', 1)[1].split(']')[0]
        msg = sin[len(s) + 2 :].strip()
        s = s.split()
        head = s[0]
        ts = ' '.join(s[1:3])
        svc = s[3]
        typ = s[4].split(':')[0]
        code = s[4].split(':')[1]
    except:
        head, ts, svc, typ, code = '', '', '', '', ''
        msg = sin
    return head, ts, svc, typ, code, msg

In [24]:
udf_sq_brackets = udf(sq_brackets, ArrayType(StringType()))

In [25]:
sdf = sdf.withColumn('log_msg', udf_sq_brackets('log'))
sdf.limit(5).toPandas()

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


Unnamed: 0,date,img,hub,host,log,stream,time,log_msg
0,2022-12-09T04:50:48.335844Z,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,[I 2022-12-09 04:50:48.335 JupyterHub log:181]...,stderr,2022-12-09T04:50:48.33584421Z,"[I, 2022-12-09 04:50:48.335, JupyterHub, log, ..."
1,2022-12-09T04:50:48.359937Z,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,[W 2022-12-09 04:50:48.359 JupyterHub log:181]...,stderr,2022-12-09T04:50:48.359937031Z,"[W, 2022-12-09 04:50:48.359, JupyterHub, log, ..."
2,2022-12-09T04:50:55.940651Z,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,[I 2022-12-09 04:50:55.940 JupyterHub log:181]...,stderr,2022-12-09T04:50:55.940651688Z,"[I, 2022-12-09 04:50:55.940, JupyterHub, log, ..."
3,2022-12-09T04:50:55.968410Z,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,[W 2022-12-09 04:50:55.968 JupyterHub log:181]...,stderr,2022-12-09T04:50:55.968410334Z,"[W, 2022-12-09 04:50:55.968, JupyterHub, log, ..."
4,2022-12-09T04:50:51.758320Z,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,[I 2022-12-09 04:50:51.758 JupyterHub log:181]...,stderr,2022-12-09T04:50:51.758320284Z,"[I, 2022-12-09 04:50:51.758, JupyterHub, log, ..."


In [26]:
sdf = sdf.select(
    'img',
    'hub',
    'host',
    F.col('log_msg')[0].alias('head'),
    F.col('log_msg')[1].alias('timestamp'),
    F.col('log_msg')[2].alias('service'),
    F.col('log_msg')[3].alias('event_type'),
    F.col('log_msg')[4].alias('event_code'),
    F.col('log_msg')[5].alias('message')
)
sdf.limit(5).toPandas()

Unnamed: 0,img,hub,host,head,timestamp,service,event_type,event_code,message
0,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,I,2022-12-09 04:50:48.335,JupyterHub,log,181,302 GET /utilities/login/index.php -> /hub/uti...
1,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,W,2022-12-09 04:50:48.359,JupyterHub,log,181,404 GET /hub/utilities/login/index.php (@10.11...
2,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,I,2022-12-09 04:50:55.940,JupyterHub,log,181,302 GET /test-output/ -> /hub/test-output/ (@1...
3,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,W,2022-12-09 04:50:55.968,JupyterHub,log,181,404 GET /hub/test-output/ (@10.112.128.1) 1.19ms
4,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,I,2022-12-09 04:50:51.758,JupyterHub,log,181,302 GET /admin.pl -> /hub/admin.pl (@10.112.12...


#### 4.4. Find users' activities

In [27]:
def parce_users_activities(code, msg):
    """
    Ugly function.
    
    You may use dictionary to make it
    more pythonic or something else.
    
    """
    if code == '43':
        user = msg.split()[-1]
        log = 'logged out'
    elif code == '757':
        user = msg.split()[-1]
        log = 'logged in'
    elif code == '402':
        user = msg.split()[0]
        log = 'pending spawn'
    elif code == '1875':
        user = msg.split()[4].replace('claim-', '').replace(',', '')
        log = 'attempt to create pvc with timeout'
    elif code == '1887':
        user = msg.split()[1].replace('claim-', '')
        log = 'pvc already exists'
    elif code == '1840':
        user = msg.split()[4].replace('jupyter-', '').replace(',', '')
        log = 'attempting to create pod with timeout'
    elif code == '1344':
        user = msg.split('/')[3]
        log = 'failing suspected api request to not-running server'
    elif code == '380':
        user = msg.split()[3]
        log = 'previous spawn failed'
    elif code == '567':
        user = msg.split('/')[4]
        log = 'stream closed while handling '
    elif code == '681':
        user = msg.split()[0].replace('\'s', '')
        log = 'server failed to start'
    elif code == '1997':
        user = msg.split('-')[-1]
        log = 'deleting pod'
    elif code == '689':
        user = msg.split()[3].replace('\'s', '')
        log = 'unhandled error starting with timeout'
    elif code == '1961' or code == '2044':
        user = msg.split()[1].replace('jupyter-', '')
        log = 'restarting pod reflector'
    elif code == '257':
        user = msg.split()[2]
        log = 'adding user to proxy'
    elif code == '664':
        user = msg.split()[1]
        log = 'server is ready'
    elif code == '61' or code == '85':
        user = msg.split()[3]
        log = 'spawning sever with advanced configuration option'
    elif code == '1143':
        user = msg.split()[1].replace(':', '')
        log = 'server is slow to stop'
    elif code == '2077':
        user = msg.split()[0]
        log = 'still running'
    elif code == '167':
        user = msg.split()[1]
        log = 'server is already active'
    elif code == '1067' or code == '2022':
        user = msg.split()[1]
        log = 'user server stopped with exit code 1'
    elif code == '1857':
        user = msg.split()[3].replace('jupyter-', '').replace(',', '')
        log = 'found existing pod and attempting to kill'
    elif code == '1861':
        user = msg.split()[2].replace('jupyter-', '').replace(',', '')
        log = 'killed pod and will try starting singleuser pod again'
    elif code == '738':
        user = msg.split()[0].replace(',', '').replace('\'s', '')
        log = 'server never showed up and giving up'
    elif code == '2069':
        user = msg.split()[0].replace(',', '')
        log = 'user does not appear to be running and shutting it down'  
    elif code == '148':
        user = msg.split()[-1]
        log = 'user is running'
    elif code == '1415':
        user = msg.split()[-1]
        log = 'admin requesting spawn on behalf'
    elif code == '1437':
        user = msg.split()[5].replace(',', '')
        log = 'user requested server which user do not own'
    elif code == '626':
        user = msg.split()[1]
        log = 'server is already started'
    elif code == '2085':
        user = msg.split()[0]
        log = 'server appears to have stopped while the hub was down'
    else:
        user, log = '', ''
    return user, log

In [28]:
udf_parce_users_activities = udf(parce_users_activities, ArrayType(StringType()))

In [29]:
sdf = sdf.withColumn('user_act', udf_parce_users_activities(sdf['event_code'], sdf['message']))
sdf.limit(5).toPandas()

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


Unnamed: 0,img,hub,host,head,timestamp,service,event_type,event_code,message,user_act
0,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,I,2022-12-09 04:50:48.335,JupyterHub,log,181,302 GET /utilities/login/index.php -> /hub/uti...,"[, ]"
1,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,W,2022-12-09 04:50:48.359,JupyterHub,log,181,404 GET /hub/utilities/login/index.php (@10.11...,"[, ]"
2,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,I,2022-12-09 04:50:55.940,JupyterHub,log,181,302 GET /test-output/ -> /hub/test-output/ (@1...,"[, ]"
3,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,W,2022-12-09 04:50:55.968,JupyterHub,log,181,404 GET /hub/test-output/ (@10.112.128.1) 1.19ms,"[, ]"
4,jupyterhub/k8s-hub:0.11.1,hub-5c66c6c96f-p5xcc,cl1flrrk4hvdbm084md4-ahoc,I,2022-12-09 04:50:51.758,JupyterHub,log,181,302 GET /admin.pl -> /hub/admin.pl (@10.112.12...,"[, ]"


In [30]:
sdf = sdf.select(
    'timestamp',
    'hub',
    'img',
    'host',
    'event_code',
    'event_type',
    F.col('user_act')[1].alias('log'),
    F.col('user_act')[0].alias('user')
)
sdf.limit(5).toPandas()

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


Unnamed: 0,timestamp,hub,img,host,event_code,event_type,log,user
0,2022-12-09 04:50:48.335,hub-5c66c6c96f-p5xcc,jupyterhub/k8s-hub:0.11.1,cl1flrrk4hvdbm084md4-ahoc,181,log,,
1,2022-12-09 04:50:48.359,hub-5c66c6c96f-p5xcc,jupyterhub/k8s-hub:0.11.1,cl1flrrk4hvdbm084md4-ahoc,181,log,,
2,2022-12-09 04:50:55.940,hub-5c66c6c96f-p5xcc,jupyterhub/k8s-hub:0.11.1,cl1flrrk4hvdbm084md4-ahoc,181,log,,
3,2022-12-09 04:50:55.968,hub-5c66c6c96f-p5xcc,jupyterhub/k8s-hub:0.11.1,cl1flrrk4hvdbm084md4-ahoc,181,log,,
4,2022-12-09 04:50:51.758,hub-5c66c6c96f-p5xcc,jupyterhub/k8s-hub:0.11.1,cl1flrrk4hvdbm084md4-ahoc,181,log,,


In [31]:
sdf = sdf.filter(sdf.user != '')
sdf = sdf.withColumn(
    'timestamp',
    F.to_timestamp('timestamp', 'yyyy-MM-dd HH:mm:ss.SSS')
)
sdf.limit(5).toPandas()

Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
EOFError
Traceback (most recent call last):
  File "/us

Unnamed: 0,timestamp,hub,img,host,event_code,event_type,log,user
0,2023-02-13 09:30:12.727,hub-57c88d997b-xh654,jupyterhub/k8s-hub:0.11.1,cl1flrrk4hvdbm084md4-elef,257,proxy,adding user to proxy,st107874
1,2023-02-13 09:30:12.729,hub-57c88d997b-xh654,jupyterhub/k8s-hub:0.11.1,cl1flrrk4hvdbm084md4-elef,664,users,server is ready,st107874
2,2023-02-13 09:30:12.730,hub-57c88d997b-xh654,jupyterhub/k8s-hub:0.11.1,cl1flrrk4hvdbm084md4-elef,664,users,server is ready,st107874
3,2023-02-13 09:30:13.534,hub-57c88d997b-xh654,jupyterhub/k8s-hub:0.11.1,cl1flrrk4hvdbm084md4-elef,257,proxy,adding user to proxy,st112224
4,2023-02-13 09:30:13.536,hub-57c88d997b-xh654,jupyterhub/k8s-hub:0.11.1,cl1flrrk4hvdbm084md4-elef,664,users,server is ready,st112224


### 5. Normalize data and write to database

#### 5.1. Users table

In [32]:
logins = sdf.select('user').distinct().collect()
print(len(logins))
logins = [list(x)[0] for x in logins]
logins[:5]



165


                                                                                

['st097322', 'st109438', 'st110696', 'st107590', 'st109257']

In [33]:
!pip install names



In [34]:
import names

In [35]:
users = []
for login in logins:
    user = {}
    user['login'] = login
    user['name'] = names.get_full_name()
    user['email'] = login + '@gsom.spbu.ru'
    users.append(user)

In [36]:
rdd = sc.parallelize([users])
sdf_users = spark.read.json(rdd)
sdf_users.printSchema()

root
 |-- email: string (nullable = true)
 |-- login: string (nullable = true)
 |-- name: string (nullable = true)



In [37]:
sdf_users.show(5)

+--------------------+--------+---------------+
|               email|   login|           name|
+--------------------+--------+---------------+
|st097322@gsom.spb...|st097322|Garrett Brannen|
|st109438@gsom.spb...|st109438|   Tiffany Hunt|
|st110696@gsom.spb...|st110696|    April Jones|
|st107590@gsom.spb...|st107590|Felicia Burdett|
|st109257@gsom.spb...|st109257|   Tina Harding|
+--------------------+--------+---------------+
only showing top 5 rows



##### <font color='blue'>ClickHouse</font>

In [38]:
query = '''
DROP TABLE db1.users;
'''
rs = execute_query(query, access_ch)
rs.text

''

In [39]:
query = '''
CREATE TABLE IF NOT EXISTS db1.users (
    email String,
    login String,
    name String
) ENGINE = MergeTree
ORDER BY email;
'''
rs = execute_query(query, access_ch)
rs

<Response [200]>

In [40]:
URL_CH = 'jdbc:clickhouse://{}:{}/{}'.format(
    access_ch['host'],
    access_ch['port'],
    access_ch['dbname']
)
DRIVER_CH = 'com.clickhouse.jdbc.ClickHouseDriver'

In [41]:
table_name = 'users'

sdf_users.write \
    .format('jdbc') \
    .mode('append') \
    .option('url', URL_CH) \
    .option('user', access_ch['user']) \
    .option('password', access_ch['password']) \
    .option('dbtable', table_name) \
    .option('driver', DRIVER_CH) \
    .option('ssl', 1) \
    .option('sslmode', 'strict') \
    .option('sslrootcert', f'/home/jovyan/__DATA/IBDT_Spring_2023/topics_labs/{access_ch["sslrootcert"]}') \
    .save()

23/04/13 11:36:07 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException instead.
23/04/13 11:36:07 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException instead.
23/04/13 11:36:07 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException instead.
23/04/13 11:36:07 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction [c81b0311-4ccb-4c24-b388-795d3e6c98a3] (1 queries & 0 savepoints) is committed.
23/04/13 11:36:07 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction [282a7c10-1bfd-456f-8ff1-f068559fad24] (0 queries & 0 savepoints) is committed.
23/04/13 11:36:07 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException ins

In [42]:
# test query
query = '''
SELECT * FROM db1.users LIMIT 5;
'''
rs = execute_query(query, access_ch)
print(rs.text)

aasoloviev@gsom.spbu.ru	aasoloviev	John Lawrence
ab2216206@gsom.spbu.ru	ab2216206	Megan Davis
ab2219090@gsom.spbu.ru	ab2219090	Mary Nelson
abulatov@gsom.spbu.ru	abulatov	Jennifer Ramirez
albulatov@gsom.spbu.ru	albulatov	Isabel Lucero



##### <font color='green'>PostgreSQL</font>

In [43]:
query = '''
DROP TABLE IF EXISTS users
'''
send_query(query, access_postgres)

In [44]:
query = '''
CREATE TABLE IF NOT EXISTS users (
    email varchar(128),
    login varchar(32),
    name varchar(128)
);
'''
send_query(query, access_postgres)

In [45]:
URL_PG = 'jdbc:postgresql://{}:{}/{}'.format(
    access_postgres["host"],
    access_postgres["port"],
    access_postgres["dbname"]
)
DRIVER_PG = 'org.postgresql.Driver'

In [46]:
sdf_users.write\
    .mode('append') \
    .format('jdbc') \
    .option('url', URL_PG) \
    .option('dbtable', table_name) \
    .option('user', access_postgres['user']) \
    .option('password', access_postgres['password']) \
    .option('driver', DRIVER_PG) \
    .option('ssl', True) \
    .option('sslmode', 'require') \
    .save()

In [47]:
query = '''
SELECT * FROM users LIMIT 5;
'''
send_query(query, access_postgres, res=True)

[('st097322@gsom.spbu.ru', 'st097322', 'Garrett Brannen'),
 ('st109438@gsom.spbu.ru', 'st109438', 'Tiffany Hunt'),
 ('st110696@gsom.spbu.ru', 'st110696', 'April Jones'),
 ('st107590@gsom.spbu.ru', 'st107590', 'Felicia Burdett'),
 ('st109257@gsom.spbu.ru', 'st109257', 'Tina Harding')]

#### 5.2. JupyterHub instances table

In [48]:
sdf_instances = sdf.select(
    'hub',
    'img',
    'host'
)
sdf_instances = sdf_instances.dropDuplicates()

##### <font color='blue'>ClickHouse</font>

In [49]:
query = '''
DROP TABLE db1.instances;
'''
rs = execute_query(query, access_ch)
rs.text

''

In [50]:
query = '''
CREATE TABLE IF NOT EXISTS db1.instances (
    hub String,
    img String,
    host String
) ENGINE = MergeTree
ORDER BY hub;
'''
rs = execute_query(query, access_ch)
rs

<Response [200]>

In [51]:
table_name = 'instances'

sdf_instances.write \
    .format('jdbc') \
    .mode('append') \
    .option('url', URL_CH) \
    .option('user', access_ch['user']) \
    .option('password', access_ch['password']) \
    .option('dbtable', table_name) \
    .option('driver', DRIVER_CH) \
    .option('ssl', 1) \
    .option('sslmode', 'strict') \
    .option('sslrootcert', f'/home/jovyan/__DATA/IBDT_Spring_2023/topics_labs/{access_ch["sslrootcert"]}') \
    .save()

23/04/13 11:36:27 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException instead.
23/04/13 11:36:27 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException instead.
23/04/13 11:36:27 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException instead.
23/04/13 11:36:27 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction [94728ffd-e186-440d-a92e-e648fba60ff9] (2 queries & 0 savepoints) is committed.
23/04/13 11:36:27 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction [d550a02f-384d-4b0c-bc68-674994d878e3] (0 queries & 0 savepoints) is committed.
23/04/13 11:36:27 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException ins

In [52]:
# test query
query = '''
SELECT * FROM db1.instances LIMIT 5;
'''
rs = execute_query(query, access_ch)
print(rs.text)

hub-56bbc6d5f7-5wsph	jupyterhub/k8s-hub:0.11.1	cl1flrrk4hvdbm084md4-ahoc
hub-56bbc6d5f7-rgsct	jupyterhub/k8s-hub:0.11.1	cl1flrrk4hvdbm084md4-ahoc
hub-57c88d997b-xh654	jupyterhub/k8s-hub:0.11.1	cl1flrrk4hvdbm084md4-elef
hub-58f6d59b46-jfwm9	jupyterhub/k8s-hub:0.11.1	cl1flrrk4hvdbm084md4-ahoc
hub-59778cfbc5-kb9tb	jupyterhub/k8s-hub:0.11.1	cl1flrrk4hvdbm084md4-elef



##### <font color='green'>PostgreSQL</font>

In [53]:
query = '''
DROP TABLE IF EXISTS instances
'''
send_query(query, access_postgres)

In [54]:
query = '''
CREATE TABLE IF NOT EXISTS instances (
    hub varchar(64),
    img varchar(64),
    host varchar(64)
);
'''
send_query(query, access_postgres)

In [55]:
sdf_instances.write\
    .mode('append') \
    .format('jdbc') \
    .option('url', URL_PG) \
    .option('dbtable', table_name) \
    .option('user', access_postgres['user']) \
    .option('password', access_postgres['password']) \
    .option('driver', DRIVER_PG) \
    .option('ssl', True) \
    .option('sslmode', 'require') \
    .save()

                                                                                

In [56]:
query = '''
SELECT * FROM instances LIMIT 5;
'''
send_query(query, access_postgres, res=True)

[('hub-69c5fcf7fb-jxwwm',
  'jupyterhub/k8s-hub:0.11.1',
  'cl1flrrk4hvdbm084md4-elef'),
 ('hub-5b59fd67fc-v4shf',
  'jupyterhub/k8s-hub:0.11.1',
  'cl1flrrk4hvdbm084md4-okub'),
 ('hub-7c7876787d-96fnt',
  'jupyterhub/k8s-hub:0.11.1',
  'cl1flrrk4hvdbm084md4-elef'),
 ('hub-5b746df7c6-ssj98',
  'jupyterhub/k8s-hub:0.11.1',
  'cl1flrrk4hvdbm084md4-ahoc'),
 ('hub-5bb9b9c56c-s958d',
  'jupyterhub/k8s-hub:0.11.1',
  'cl1flrrk4hvdbm084md4-ahoc')]

#### 5.3. JupyterHub logs table

In [57]:
sdf_logs = sdf.select(
    'timestamp',
    'hub',
    'event_code',
    'event_type',
    'log',
    F.col('user').alias('login')
)
sdf_logs = sdf_logs.dropDuplicates()

##### <font color='blue'>ClickHouse</font>

In [58]:
query = '''
DROP TABLE db1.logs;
'''
rs = execute_query(query, access_ch)
rs.text

"Code: 60. DB::Exception: Table db1.logs doesn't exist. (UNKNOWN_TABLE) (version 23.3.1.2823 (official build))\n"

In [59]:
query = '''
CREATE TABLE IF NOT EXISTS db1.logs (
    timestamp DateTime,
    hub String,
    event_code Int32,
    event_type String,
    log String,
    login String
) ENGINE = MergeTree
ORDER BY hub;
'''
rs = execute_query(query, access_ch)
rs

<Response [200]>

In [60]:
table_name = 'logs'

sdf_logs.write \
    .format('jdbc') \
    .mode('append') \
    .option('url', URL_CH) \
    .option('user', access_ch['user']) \
    .option('password', access_ch['password']) \
    .option('dbtable', table_name) \
    .option('driver', DRIVER_CH) \
    .option('ssl', 1) \
    .option('sslmode', 'strict') \
    .option('sslrootcert', f'/home/jovyan/__DATA/IBDT_Spring_2023/topics_labs/{access_ch["sslrootcert"]}') \
    .save()

23/04/13 11:37:09 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException instead.
23/04/13 11:37:09 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException instead.
23/04/13 11:37:09 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException instead.
23/04/13 11:37:09 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException instead.
23/04/13 11:37:09 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction is not supported. You may change jdbcCompliant to false to throw SQLException instead.
23/04/13 11:37:09 WARN ClickHouseConnectionImpl: [JDBC Compliant Mode] Transaction [99bec9a8-3a70-4193-99a3-ca458a1b1810] (2 queries & 0 savepoints) i

In [61]:
# test query
query = '''
SELECT * FROM db1.logs LIMIT 5;
'''
rs = execute_query(query, access_ch)
print(rs.text)

2022-11-15 13:02:17	hub-56bbc6d5f7-5wsph	1344	base	failing suspected api request to not-running server	st061467
2022-11-15 13:02:28	hub-56bbc6d5f7-5wsph	1344	base	failing suspected api request to not-running server	st061467
2022-11-13 21:49:28	hub-56bbc6d5f7-5wsph	1344	base	failing suspected api request to not-running server	st102860
2022-11-13 20:28:34	hub-56bbc6d5f7-5wsph	1344	base	failing suspected api request to not-running server	st102860
2022-11-11 17:40:56	hub-56bbc6d5f7-5wsph	1344	base	failing suspected api request to not-running server	st102860



##### <font color='green'>PostgreSQL</font>

In [62]:
query = '''
DROP TABLE IF EXISTS logs
'''
send_query(query, access_postgres)

In [63]:
query = '''
CREATE TABLE IF NOT EXISTS logs (
    timestamp timestamp,
    hub varchar(64),
    event_code varchar(32),
    event_type varchar(64),
    log text,
    login varchar(32)
);
'''
send_query(query, access_postgres)

In [64]:
sdf_logs.write\
    .mode('append') \
    .format('jdbc') \
    .option('url', URL_PG) \
    .option('dbtable', table_name) \
    .option('user', access_postgres['user']) \
    .option('password', access_postgres['password']) \
    .option('driver', DRIVER_PG) \
    .option('ssl', True) \
    .option('sslmode', 'require') \
    .save()

                                                                                

In [65]:
query = '''
SELECT * FROM logs LIMIT 5;
'''
send_query(query, access_postgres, res=True)

[(datetime.datetime(2023, 2, 13, 9, 32, 22, 792000),
  'hub-57c88d997b-xh654',
  '402',
  'pages',
  'pending spawn',
  'st112224'),
 (datetime.datetime(2023, 2, 11, 7, 27, 31, 64000),
  'hub-57c88d997b-xh654',
  '402',
  'pages',
  'pending spawn',
  'st108949'),
 (datetime.datetime(2022, 9, 6, 6, 23, 39, 295000),
  'hub-5bb9b9c56c-5k7nq',
  '402',
  'pages',
  'pending spawn',
  'st110132'),
 (datetime.datetime(2022, 10, 25, 6, 51, 24, 576000),
  'hub-5bb9b9c56c-5k7nq',
  '1840',
  'spawner',
  'attempting to create pod with timeout',
  'st110869'),
 (datetime.datetime(2022, 11, 1, 8, 8, 29, 905000),
  'hub-5bb9b9c56c-s958d',
  '1344',
  'base',
  'failing suspected api request to not-running server',
  'st051829')]

### 6. How to get data back

#### 6.1. Requests and psycopg2 libraries

##### <font color='blue'>ClickHouse</font>

In [66]:
query = '''
SELECT \
    ls.timestamp, \
    ls.event_code, \
    us.login, \
    us.name, \
    us.email, \
    ls.log \
FROM logs AS ls \
LEFT JOIN users AS us ON ls.login = us.login \
LIMIT 5;
'''
rs = execute_query(query, access_ch)

In [67]:
rs

<Response [200]>

In [68]:
rs.text

'2022-11-11 14:16:54\t1344\tst102860\tMichael Bustamante\tst102860@gsom.spbu.ru\tfailing suspected api request to not-running server\n2022-11-13 21:50:28\t1344\tst102860\tMichael Bustamante\tst102860@gsom.spbu.ru\tfailing suspected api request to not-running server\n2022-11-11 18:20:56\t1344\tst102860\tMichael Bustamante\tst102860@gsom.spbu.ru\tfailing suspected api request to not-running server\n2022-11-11 13:39:54\t1344\tst102860\tMichael Bustamante\tst102860@gsom.spbu.ru\tfailing suspected api request to not-running server\n2022-11-10 16:23:16\t664\tst110923\tDan Ramos\tst110923@gsom.spbu.ru\tserver is ready\n'

In [69]:
rs.content

b'2022-11-11 14:16:54\t1344\tst102860\tMichael Bustamante\tst102860@gsom.spbu.ru\tfailing suspected api request to not-running server\n2022-11-13 21:50:28\t1344\tst102860\tMichael Bustamante\tst102860@gsom.spbu.ru\tfailing suspected api request to not-running server\n2022-11-11 18:20:56\t1344\tst102860\tMichael Bustamante\tst102860@gsom.spbu.ru\tfailing suspected api request to not-running server\n2022-11-11 13:39:54\t1344\tst102860\tMichael Bustamante\tst102860@gsom.spbu.ru\tfailing suspected api request to not-running server\n2022-11-10 16:23:16\t664\tst110923\tDan Ramos\tst110923@gsom.spbu.ru\tserver is ready\n'

In [70]:
df = pd.read_csv(io.StringIO(rs.text), sep='\t', header=None)
df

Unnamed: 0,0,1,2,3,4,5
0,2022-11-11 14:16:54,1344,st102860,Michael Bustamante,st102860@gsom.spbu.ru,failing suspected api request to not-running s...
1,2022-11-13 21:50:28,1344,st102860,Michael Bustamante,st102860@gsom.spbu.ru,failing suspected api request to not-running s...
2,2022-11-11 18:20:56,1344,st102860,Michael Bustamante,st102860@gsom.spbu.ru,failing suspected api request to not-running s...
3,2022-11-11 13:39:54,1344,st102860,Michael Bustamante,st102860@gsom.spbu.ru,failing suspected api request to not-running s...
4,2022-11-10 16:23:16,664,st110923,Dan Ramos,st110923@gsom.spbu.ru,server is ready


##### <font color='green'>PostgreSQL</font>

In [71]:
query = '''
SELECT \
    ls.timestamp, \
    ls.event_code, \
    us.login, \
    us.name, \
    us.email, \
    ls.log \
FROM logs AS ls \
LEFT JOIN users AS us ON ls.login = us.login \
LIMIT 5;
'''
send_query(query, access_postgres, res=True)

[(datetime.datetime(2023, 2, 13, 9, 32, 22, 792000),
  '402',
  'st112224',
  'Serena Bender',
  'st112224@gsom.spbu.ru',
  'pending spawn'),
 (datetime.datetime(2023, 2, 11, 7, 27, 31, 64000),
  '402',
  'st108949',
  'Louis Carboni',
  'st108949@gsom.spbu.ru',
  'pending spawn'),
 (datetime.datetime(2022, 9, 6, 6, 23, 39, 295000),
  '402',
  'st110132',
  'Loretta Turner',
  'st110132@gsom.spbu.ru',
  'pending spawn'),
 (datetime.datetime(2022, 10, 25, 6, 51, 24, 576000),
  '1840',
  'st110869',
  'Ralph Phillips',
  'st110869@gsom.spbu.ru',
  'attempting to create pod with timeout'),
 (datetime.datetime(2022, 11, 1, 8, 8, 29, 905000),
  '1344',
  'st051829',
  'Carmen Stancill',
  'st051829@gsom.spbu.ru',
  'failing suspected api request to not-running server')]

#### 6.2. Spark connection

##### <font color='blue'>ClickHouse</font>

In [72]:
table_name = 'users'

sdf_test = spark.read \
    .format('jdbc') \
    .option('url', URL_CH) \
    .option('dbtable', table_name) \
    .option('user', access_ch['user']) \
    .option('password', access_ch['password']) \
    .option('driver', DRIVER_CH) \
    .option('ssl', 1) \
    .option('sslmode', 'strict') \
    .option('sslrootcert', f'/home/jovyan/__DATA/IBDT_Spring_2023/topics_labs/{access_ch["sslrootcert"]}') \
    .load()

In [73]:
sdf_test.show(5)

+--------------------+----------+----------------+
|               email|     login|            name|
+--------------------+----------+----------------+
|aasoloviev@gsom.s...|aasoloviev|   John Lawrence|
|ab2216206@gsom.sp...| ab2216206|     Megan Davis|
|ab2219090@gsom.sp...| ab2219090|     Mary Nelson|
|abulatov@gsom.spb...|  abulatov|Jennifer Ramirez|
|albulatov@gsom.sp...| albulatov|   Isabel Lucero|
+--------------------+----------+----------------+
only showing top 5 rows



##### <font color='green'>PostgreSQL</font>

In [74]:
sdf_test = spark.read \
    .format('jdbc') \
    .option('url', URL_PG) \
    .option('dbtable', table_name) \
    .option('user', access_postgres['user']) \
    .option('password', access_postgres['password']) \
    .option('driver', DRIVER_PG) \
    .option('ssl', True) \
    .option('sslmode', 'require') \
    .load()

In [75]:
sdf_test.show(5)

+--------------------+--------+---------------+
|               email|   login|           name|
+--------------------+--------+---------------+
|st097322@gsom.spb...|st097322|Garrett Brannen|
|st109438@gsom.spb...|st109438|   Tiffany Hunt|
|st110696@gsom.spb...|st110696|    April Jones|
|st107590@gsom.spb...|st107590|Felicia Burdett|
|st109257@gsom.spb...|st109257|   Tina Harding|
+--------------------+--------+---------------+
only showing top 5 rows



#### 6.3. ClickHouse connect

[Python packages](https://pypi.org/project/clickhouse-connect/) for connecting Python to ClickHouse.

In [76]:
!pip install clickhouse-connect

Collecting clickhouse-connect
  Downloading clickhouse_connect-0.5.20-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (927 kB)
     |████████████████████████████████| 927 kB 2.1 MB/s            
Collecting lz4
  Downloading lz4-4.3.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
     |████████████████████████████████| 1.3 MB 108.6 MB/s            
[?25hCollecting zstandard
  Downloading zstandard-0.20.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.6 MB)
     |████████████████████████████████| 2.6 MB 105.7 MB/s            
Installing collected packages: zstandard, lz4, clickhouse-connect
Successfully installed clickhouse-connect-0.5.20 lz4-4.3.2 zstandard-0.20.0


In [77]:
import clickhouse_connect

In [78]:
client = clickhouse_connect.get_client(
    host=access_ch['host'], 
    username=access_ch['user'], 
    password=access_ch['password'],
    port=access_ch['port'],
    verify=f'/home/jovyan/__DATA/IBDT_Spring_2023/topics_labs/{access_ch["sslrootcert"]}'
)

In [79]:
client

<clickhouse_connect.driver.httpclient.HttpClient at 0x7f33d5c70820>

##### 6.3.1. Queries

In [80]:
client.command('SELECT COUNT(*) FROM db1.users')

165

In [81]:
result = client.query('SELECT * FROM db1.users LIMIT 5')
result.result_rows

[('aasoloviev@gsom.spbu.ru', 'aasoloviev', 'John Lawrence'),
 ('ab2216206@gsom.spbu.ru', 'ab2216206', 'Megan Davis'),
 ('ab2219090@gsom.spbu.ru', 'ab2219090', 'Mary Nelson'),
 ('abulatov@gsom.spbu.ru', 'abulatov', 'Jennifer Ramirez'),
 ('albulatov@gsom.spbu.ru', 'albulatov', 'Isabel Lucero')]

In [82]:
parameters = {
    'table1': 'logs',
    'table2': 'users',
    'key': 'login',
    'user': 'vgarshin@gsom.spbu.ru'
}
result = client.query(
    "SELECT COUNT(*) FROM db1.{table1:Identifier} AS ls \
    LEFT JOIN db1.{table2:Identifier} AS us ON ls.{key:Identifier} = us.{key:Identifier} \
    WHERE us.email = {user: String}",
    parameters=parameters
)
result.result_rows

[(3593,)]

##### 6.3.2. Insert data

In [83]:
row1 = ['test1@gsom.spbu.ru', 'test1', 'Test Number One']
row2 = ['test2@gsom.spbu.ru', 'test2', 'Test Number Two']
data = [row1, row2]
data

[['test1@gsom.spbu.ru', 'test1', 'Test Number One'],
 ['test2@gsom.spbu.ru', 'test2', 'Test Number Two']]

In [84]:
client.insert(
    'db1.users', 
    data, 
    column_names=['email', 'login', 'name']
)

In [85]:
result = client.query(
    "SELECT * FROM db1.users WHERE email='test1@gsom.spbu.ru'", 
    parameters=parameters
)
result.result_rows

[('test1@gsom.spbu.ru', 'test1', 'Test Number One')]

In [86]:
parameters = {
    'table': 'users', 
    'v1': 'Test%'
}
result = client.query(
    'SELECT * FROM db1.{table:Identifier} WHERE name LIKE {v1:String}', 
    parameters=parameters
)
result.result_rows

[('test1@gsom.spbu.ru', 'test1', 'Test Number One'),
 ('test2@gsom.spbu.ru', 'test2', 'Test Number Two')]