## Import modules

In [1]:
import os

import numpy as np
import pandas as pd
import psycopg2
from sqlalchemy import create_engine

## Get database credentials

In [2]:
HOST = os.environ['POSTGRES_HOST']
PORT = os.environ['POSTGRES_PORT']
USER = os.environ['POSTGRES_USER']
PASS = os.environ['POSTGRES_PASS']
DB = os.environ['POSTGRES_DB']

## Task #1

In [3]:
def create_connection(db_host, db_port, db_user, db_password, db_name):
    try:
        connection = psycopg2.connect(
            host=db_host,
            port=db_port,           
            user=db_user,
            password=db_password,
            database=db_name,
            )
    except psycopg2.OperationalError as err:
        print(err)
    return connection

In [4]:
connection = create_connection(HOST, PORT, USER, PASS, DB)

In [5]:
def execute_query(connection, query):
    cursor = connection.cursor()
    result = None
    try:
        cursor.execute(query)
        result = cursor.fetchall()
        return result
    except psycopg2.OperationalError as err:
        print(err)

In [6]:
sessions_columns = [
    'visitor_session_id',
    'site_id',
    'visitor_id',
    'date_time',
    'campaign_id'
]
communications_columns = [
    'communication',
    'site_id',
    'visitor_id',
    'date_time'
]
get_sessions_query = 'SELECT * FROM sessions;'
get_communications_query = 'SELECT * FROM communications;'

In [7]:
sessions = execute_query(connection, get_sessions_query)
communications = execute_query(connection, get_communications_query)

In [8]:
sessions_df = pd.DataFrame(data=sessions, columns=sessions_columns)
communications_df = pd.DataFrame(data=communications, columns=communications_columns)

In [9]:
merged_df = pd.merge(
    communications_df,
    sessions_df,
    on='visitor_id',
    how='left',
    suffixes=['_com', '_ses']
)

In [10]:
merged_df = merged_df.query('site_id_com == site_id_ses and date_time_com > date_time_ses').sort_values(['communication', 'date_time_ses'])
merged_df['row_n'] = merged_df.sort_values('date_time_ses').groupby('communication').cumcount() + 1

In [11]:
merged_df.row_n = merged_df.row_n.astype('Int64')
merged_df = merged_df.sort_values('communication').reset_index(drop=True)

In [12]:
merged_df.drop(['site_id_ses'], axis=1, inplace=True)

In [13]:
merged_df.rename(
    {'communication': 'communication_id',
    'site_id_com': 'site_id',
    'visitor_id_x': 'visitor_id',
    'date_time_com': 'communication_date_time',
    'date_time_ses': 'session_date_time',
    'communication_y': 'row_n'
    },
    axis='columns',
    inplace=True
)

In [21]:
result_df = pd.DataFrame()
for comm in merged_df.communication_id.unique():
    spam = pd.DataFrame()
    spam = merged_df.query('communication_id == @comm').sort_values('session_date_time', ascending=False).head(1)
    result_df = pd.concat([result_df, spam])
result_df = result_df.reset_index(drop=True)

## Task #2

In [15]:
psql = """
WITH merged_table AS (
    SELECT
        c.communication_id AS communication_id,
        c.site_id AS site_id,
        c.visitor_id AS visitor_id,
        c.date_time AS communication_date_time,
        s.visitor_session_id::BIGINT AS visitor_session_id,
        s.date_time AS session_date_time,
        -- LAG(s.date_time) over(PARTITION BY c.communication_id ORDER BY(s.date_time)) AS session_date_time,
        s.campaign_id AS campaign_id,
        ROW_NUMBER() over(PARTITION BY c.communication_id ORDER BY (s.date_time)) AS row_n
    FROM
        communications AS c
    LEFT JOIN
        sessions AS s
    ON
        c.visitor_id = s.visitor_id
    WHERE
        c.site_id = s.site_id
        AND c.date_time > s.date_time
    ORDER BY
        communication_id
    )

SELECT
    mt.communication_id,
    mt.site_id,
    mt.visitor_id,
    mt.communication_date_time,
    CASE
        WHEN sr.session_date_time IS NOT NULL THEN mt.visitor_session_id
    END AS visitor_session_id,
    sr.session_date_time,
    CASE
        WHEN sr.session_date_time IS NOT NULL THEN mt.campaign_id
    END AS campaign_id,
    sr.max_n AS row_nunique_sessions.reset_index(drop=True)
FROM
    merged_table AS mt
LEFT JOIN    
    (
    SELECT
        communication_id,
        MAX(row_n) AS max_n,
        MAX(session_date_time) AS session_date_time
    FROM
        merged_table
    GROUP BY
        communication_id
    ORDER BY
        communication_id
    ) AS sr
ON
    mt.communication_id = sr.communication_id
WHERE
    mt.row_n = sr.max_n
"""
execute_psql = execute_query(connection, psql)
result_psql = pd.DataFrame(execute_psql, columns=result_df.columns.to_list())
result_psql.visitor_session_id = result_psql.visitor_session_id.astype('Int64')
result_psql.campaign_id = result_psql.campaign_id.astype('Int64')

## Compare two results

In [26]:
diff = result_df.compare(result_psql)
diff.empty

True