# DWH Project

## Importing Libraries and Connections

Creating connection to S3, initializing Spark session and reading data. 

In [2]:
#Spark connection with S3 options
import os
import socket
from pyspark.sql import SparkSession


# credentials to work with S3
aws_access_key = "access_key"
aws_secret_key = "secret_ley"
s3_bucket = "kc-hardda-projects"
s3_endpoint_url = "https://storage.yandexcloud.net"

# spark session
APACHE_MASTER_IP = socket.gethostbyname("apache-spark-master-0.apache-spark-headless.apache-spark.svc.cluster.local")
APACHE_MASTER_URL = f"spark://{APACHE_MASTER_IP}:7077"
POD_IP = os.environ["MY_POD_IP"]
SPARK_APP_NAME = f"spark-{os.environ['HOSTNAME']}"
JARS = """/nfs/env/lib/python3.8/site-packages/pyspark/jars/clickhouse-native-jdbc-shaded-2.6.5.jar, 
/nfs/env/lib/python3.8/site-packages/pyspark/jars/hadoop-aws-3.3.4.jar,
/nfs/env/lib/python3.8/site-packages/pyspark/jars/aws-java-sdk-bundle-1.12.433.jar
"""

MEM = "2048m"
CORES = 1
 
spark = SparkSession.\
        builder.\
        appName(SPARK_APP_NAME).\
        master(APACHE_MASTER_URL).\
        config("spark.executor.memory", MEM).\
        config("spark.jars", JARS).\
        config("spark.executor.cores", CORES).\
        config("spark.driver.host", POD_IP).\
        config("spark.hadoop.fs.s3a.access.key", aws_access_key). \
        config("spark.hadoop.fs.s3a.secret.key", aws_secret_key). \
        config("fs.s3a.endpoint", "https://storage.yandexcloud.net").  \
        config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"). \
        config("spark.hadoop.fs.s3a.path.style.access", True). \
        config("spark.hadoop.fs.s3a.committer.name", "directory"). \
        config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"). \
        getOrCreate()






23/09/26 15:09:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Spark

In [3]:
# reading data
df_adverts_all = spark.read.parquet('s3a://kc-hardda-projects/shared/adverts_data.parquet')
df_live_adverts = spark.read.parquet('s3a://kc-hardda-projects/shared/live_adverts.parquet')
df_user_passports = spark.read.parquet('s3a://kc-hardda-projects/shared/user_passports.parquet')

# creating dataframe from our data
df_flat = df_live_adverts.join(df_adverts_all, on=['execution_date', 'advert_id'], how='left') \
                   .join(df_user_passports, df_live_adverts['user_id'] == df_user_passports['global_id'], how='left')

23/09/26 15:09:47 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


                                                                                

Checking data.

In [4]:
df_flat.limit(5).show()



+-------------------+---------+------+---------+--------+----+-----+--------+------+----+-----+------------+----+-------+---------+--------------+
|     execution_date|advert_id|region|  user_id|platform|mark|model|   price|  year|fuel|color|transmission|body|country|global_id|user_type_name|
+-------------------+---------+------+---------+--------+----+-----+--------+------+----+-----+------------+----+-------+---------+--------------+
|2021-06-26 00:00:00|137514150|  Омск|124207514| unknown|null| null|114000.0|2014.0|null| null|        null|null|   null|124207514|         profi|
|2021-09-20 00:00:00|137514150|  Омск|124207514| unknown|null| null|114000.0|2014.0|null| null|        null|null|   null|124207514|         profi|
|2021-04-03 00:00:00|145314141|   Уфа|124243239| unknown|null| null|    null|  null|null| null|        null|null|   null|124243239|   simple_user|
|2021-06-26 00:00:00|145314141|   Уфа|124243239| unknown|null| null|    null|  null|null| null|        null|null|   nu

                                                                                

Checking number of rows. 

In [5]:
df_flat.count()



23/09/26 15:11:08 WARN MemoryStore: Not enough space to cache broadcast_16 in memory! (computed 304.0 MiB so far)
23/09/26 15:11:08 WARN BlockManager: Persisting block broadcast_16 to disk instead.


                                                                                

23/09/26 15:11:14 WARN MemoryStore: Not enough space to cache broadcast_16 in memory! (computed 304.0 MiB so far)


                                                                                

2771661

Dropping duplicated data, if exists.  

In [6]:
df_flat = df_flat.dropDuplicates(['execution_date', 'advert_id'])

In [7]:
df_flat.count()

                                                                                

2771661

Checking data types of the columns. 

In [8]:
for i in df_flat.dtypes:
    print(i)

('execution_date', 'timestamp')
('advert_id', 'bigint')
('region', 'string')
('user_id', 'bigint')
('platform', 'string')
('mark', 'string')
('model', 'string')
('price', 'double')
('year', 'double')
('fuel', 'string')
('color', 'string')
('transmission', 'string')
('body', 'string')
('country', 'string')
('global_id', 'bigint')
('user_type_name', 'string')


Writing results to S3.

In [9]:
student_directory = 'anikitin8'

df_flat.coalesce(1).write.parquet(f"s3a://kc-hardda-projects/{student_directory}/flat_table/", mode='overwrite')

23/09/26 15:11:37 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

Checking that we have saved data to S3 successfully. 

In [10]:
df_flat_test = spark.read.parquet(f"s3a://kc-hardda-projects/{student_directory}/flat_table/")
df_flat_test.show(3)
df_flat_test.count()

                                                                                

+-------------------+---------+------+---------+--------+----+-----+-----+----+----+-----+------------+----+-------+---------+--------------+
|     execution_date|advert_id|region|  user_id|platform|mark|model|price|year|fuel|color|transmission|body|country|global_id|user_type_name|
+-------------------+---------+------+---------+--------+----+-----+-----+----+----+-----+------------+----+-------+---------+--------------+
|2020-11-12 00:00:00|134709471|  Омск|123482031| desktop|null| null| null|null|null| null|        null|null|   null|123482031|   simple_user|
|2020-11-12 00:00:00|146077599|  Сочи|125256070| desktop|null| null| null|null|null| null|        null|null|   null|125256070|   simple_user|
|2020-11-12 00:00:00|146294791|  Омск|126668838| desktop|null| null| null|null|null| null|        null|null|   null|126668838|         profi|
+-------------------+---------+------+---------+--------+----+-----+-----+----+----+-----+------------+----+-------+---------+--------------+
only s

                                                                                

2771661

## S3

Let's check the name of the parquet file. 

In [11]:
import boto3
from botocore.exceptions import NoCredentialsError


s3 = boto3.client('s3',
                  aws_access_key_id=aws_access_key,
                  aws_secret_access_key=aws_secret_key,
                  endpoint_url=s3_endpoint_url)

response = s3.list_objects_v2(Bucket=s3_bucket, Prefix = student_directory)

if 'Contents' in response:
    print(f"Objects in bucket '{s3_bucket}':")
    for obj in response['Contents']:
        print(f"- {obj['Key']}")
        path = obj['Key']
else:
    print(f"No objects found in bucket {s3_bucket}")

Objects in bucket 'kc-hardda-projects':
- anikitin8/flat_table/_SUCCESS
- anikitin8/flat_table/part-00000-1933d199-bb86-422c-a4e9-dcc7f538ec4c-c000.snappy.parquet


In [12]:
response = s3.head_object(Bucket=s3_bucket, Key=f'{path}')

print(f'File size: {round(response["ContentLength"] / 1024 / 1024, 2)} Mb')

File size: 31.91 Mb


## ClickHouse

### From S3 to Main Table

Creating connection to ClickHouse database.

In [13]:
!pip install clickhouse_driver

Collecting clickhouse_driver
  Using cached clickhouse_driver-0.2.6-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (755 kB)
Collecting tzlocal
  Using cached tzlocal-5.0.1-py3-none-any.whl (20 kB)
Collecting backports.zoneinfo
  Using cached backports.zoneinfo-0.2.1-cp38-cp38-manylinux1_x86_64.whl (74 kB)
Installing collected packages: backports.zoneinfo, tzlocal, clickhouse_driver
Successfully installed backports.zoneinfo-0.2.1 clickhouse_driver-0.2.6 tzlocal-5.0.1


In [14]:
from clickhouse_driver import Client


user_name = 'user_name'
pwd = 'password'

# creating connection ClickHouse
client = Client(host='clickhouse.lab.karpov.courses', port=9000,
                user=user_name, password=pwd, database='hardda_student_data')

# checking connection
result = client.execute("SELECT * FROM hardda.user_dm_events LIMIT 10")

# showing the result
for row in result[0:1]:
    print(row)

(datetime.date(2022, 2, 1), datetime.date(2022, 1, 31), 'android', 'f7411212fd0e2523e126cbfdd3f226c211212', '4beb10e1-aeeb-4c52-acd2-ce1ddbc1fc24b10e1', 22, 11, 3, 0, 0, 0, 2, 2, 0, 0, 0, 0, 0, 0)


Creating ClickHouse table using S3 engine. 

In [35]:
query = '''
DROP TABLE IF EXISTS hardda_student_data.prj_s3_ext_nav2103
'''

In [36]:
client.execute(query)

[]

In [37]:
query = '''
CREATE TABLE hardda_student_data.prj_s3_ext_nav2103 (
    `execution_date` Date,
    `advert_id` UInt64,
    `region` LowCardinality(String),
    `user_id` UInt64,
    `platform` LowCardinality(String),
    `mark` Nullable(String),
    `model` Nullable(String),
    `price` Nullable(UInt64),
    `year` Nullable(UInt16),
    `fuel` LowCardinality(Nullable(String)),
    `color` LowCardinality(Nullable(String)),
    `transmission` LowCardinality(Nullable(String)), 
    `body` LowCardinality(Nullable(String)), 
    `country` LowCardinality(Nullable(String)),
    `global_id` Nullable(UInt64),
    `user_type_name` LowCardinality(Nullable(String))
)
ENGINE = S3 (
    'https://storage.yandexcloud.net/kc-hardda-projects/{}',
    'access_key', 
    'secrete_key',
    'Parquet'
    )
PARTITION BY toStartOfMonth(execution_date) 
ORDER BY (execution_date, advert_id)
PRIMARY KEY (execution_date, advert_id)
'''.format(path)

In [38]:
result = client.execute(query)

Creating ClickHouse table using MergeTree engine.

In [39]:
query = '''
DROP TABLE IF EXISTS hardda_student_data.prj_main_nav2103
'''

In [40]:
client.execute(query)

[]

In [41]:
query = '''
CREATE TABLE hardda_student_data.prj_main_nav2103 (
    `execution_date` Date,
    `advert_id` UInt64,
    `region` LowCardinality(String),
    `user_id` UInt64,
    `platform` LowCardinality(String),
    `mark` Nullable(String),
    `model` Nullable(String),
    `price` Nullable(UInt64),
    `year` Nullable(UInt16),
    `fuel` LowCardinality(Nullable(String)),
    `color` LowCardinality(Nullable(String)),
    `transmission` LowCardinality(Nullable(String)), 
    `body` LowCardinality(Nullable(String)), 
    `country` LowCardinality(Nullable(String)),
    `global_id` Nullable(UInt64),
    `user_type_name` LowCardinality(Nullable(String))
)
ENGINE = MergeTree()
PARTITION BY toStartOfMonth(execution_date) 
ORDER BY (execution_date, advert_id)
PRIMARY KEY (execution_date, advert_id)
SETTINGS index_granularity = 8192
'''

In [42]:
result = client.execute(query)

In [43]:
query = '''
INSERT INTO hardda_student_data.prj_main_nav2103
SELECT
  *
FROM 
  hardda_student_data.prj_s3_ext_nav2103 
'''

In [44]:
result = client.execute(query)

Checking the result.

In [45]:
query = '''
select * from prj_main_nav2103 limit(2)
'''

result = client.execute(query)

print(*result, sep='\n')

(datetime.date(2020, 11, 12), 134709471, 'Омск', 123482031, 'desktop', None, None, None, None, None, None, None, None, None, 123482031, 'simple_user')
(datetime.date(2020, 11, 12), 136066077, 'Омск', 124162590, 'desktop', None, None, None, None, None, None, None, None, None, 124162590, 'simple_user')


Creating materialized view _agg1. 

### Aggregating Materialize View 1

In [71]:
query = '''
DROP TABLE IF EXISTS hardda_student_data.nav2103_agg_1
'''

client.execute(query)

[]

Creating table with AggregatingMergeTree engine. 

In [72]:
query = '''
CREATE TABLE hardda_student_data.nav2103_agg_1 (
    `execution_date` Date,
    `platform` LowCardinality(String),
    `user_type_name` LowCardinality(Nullable(String)),
    `min_price` AggregateFunction(min, Nullable(UInt64)),
    `quantile_0_25` AggregateFunction(quantile(0.25), Nullable(UInt64)),
    `median_price` AggregateFunction(quantile(0.5), Nullable(UInt64)),
    `avg_price` AggregateFunction(avg, Nullable(UInt64)),
    `quantile_0_75` AggregateFunction(quantile(0.75), Nullable(UInt64)),
    `max_price` AggregateFunction(max, Nullable(UInt64))
)
ENGINE = AggregatingMergeTree
ORDER BY (execution_date, platform)
'''

client.execute(query)

[]

In [73]:
query = '''
DROP TABLE IF EXISTS hardda_student_data.prj_main_nav2103_agg_view_1
'''

client.execute(query)

[]

Creating Materialized View based on the table with AggregatingMergeTree engine. 

In [74]:
query = '''
CREATE MATERIALIZED VIEW hardda_student_data.prj_main_nav2103_agg_view_1
TO hardda_student_data.nav2103_agg_1 AS
SELECT
    execution_date,
    platform,
    user_type_name,
    minState(price) AS min_price,
    quantileState(0.25)(price) AS quantile_0_25,
    quantileState(0.5)(price) AS median_price,
    avgState(price) AS avg_price,
    quantileState(0.75)(price) AS quantile_0_75,
    maxState(price) AS max_price
FROM
    hardda_student_data.prj_main_nav2103
GROUP BY 
    execution_date,
    platform,
    user_type_name
'''

client.execute(query)

[]

Inserting data to our AggregatingMergeTree engine table. 

In [75]:
query = '''
INSERT INTO hardda_student_data.nav2103_agg_1
SELECT
    execution_date,
    platform,
    user_type_name,
    minState(price) AS min_price,
    quantileState(0.25)(price) AS quantile_0_25,
    quantileState(0.5)(price) AS median_price,
    avgState(price) AS avg_price,
    quantileState(0.75)(price) AS quantile_0_75,
    maxState(price) AS max_price
FROM
    hardda_student_data.prj_main_nav2103
GROUP BY 
    execution_date,
    platform,
    user_type_name
'''

client.execute(query)

[]

Checking the result. 

In [26]:
query = '''
SELECT
    execution_date,
    platform,
    user_type_name,
    minMerge(min_price) AS min_price,
    maxMerge(max_price) AS max_price,
    avgMerge(avg_price) AS avg_price
FROM 
    hardda_student_data.prj_main_nav2103_agg_view_1
GROUP BY 
    execution_date,
    platform,
    user_type_name
ORDER BY execution_date
LIMIT(5)
'''

In [27]:
result = client.execute(query)

for i in result:
    print(i)

(datetime.date(2020, 11, 12), 'unknown', 'profi', 2000, 2000, 2000.0)
(datetime.date(2020, 11, 12), 'mobile', 'profi', 0, 60000, 7508.484375)
(datetime.date(2020, 11, 12), 'ios', None, 0, 8100000, 50516.337630143775)
(datetime.date(2020, 11, 12), 'android', 'simple_user', 0, 5180000, 89786.885625966)
(datetime.date(2020, 11, 12), 'desktop', None, 0, 8900000, 555575.692926045)


Is it true that `profi` users average price is bigger than `simple_user` average price?

In [15]:
query = '''
SELECT
    user_type_name,
    avgMerge(avg_price) avg_hits_per_hour
FROM 
    hardda_student_data.prj_main_nav2103_agg_view_1
GROUP BY 
    user_type_name
ORDER BY 
    user_type_name
'''

In [16]:
result = client.execute(query)

for i in result:
    print(i)

('avtosalon', 533367.1845347529)
('profi', 261974.95002196028)
('simple_user', 251042.5140967816)
(None, 232742.0624330976)


### Aggregating Materialize View 2

In [21]:
query = '''
DROP TABLE IF EXISTS hardda_student_data.nav2103_agg_2
'''

client.execute(query)

[]

Creating table with AggregatingMergeTree engine. 

In [22]:
query = '''
CREATE TABLE hardda_student_data.nav2103_agg_2 (
    `advert_id` UInt64,
    `user_id` UInt64,
    `mark` LowCardinality(Nullable(String)),
    `model` LowCardinality(Nullable(String)),
    `min_date` AggregateFunction(min, Date),
    `max_date` AggregateFunction(max, Date),
    `min_price` AggregateFunction(min, Nullable(UInt64)),
    `median_price` AggregateFunction(quantile(0.5), Nullable(UInt64)),
    `max_price` AggregateFunction(max, Nullable(UInt64))
)
ENGINE = AggregatingMergeTree
ORDER BY (advert_id, user_id, mark, model)
'''

client.execute(query)

[]

In [23]:
query = '''
DROP TABLE IF EXISTS hardda_student_data.prj_main_nav2103_agg_view_2
'''

client.execute(query)

[]

Creating Materialized View based on the table with AggregatingMergeTree engine. 

In [24]:
query = '''
CREATE MATERIALIZED VIEW hardda_student_data.prj_main_nav2103_agg_view_2
TO hardda_student_data.nav2103_agg_2 AS
SELECT
    advert_id, 
    user_id,
    mark,
    model,
    minState(execution_date) AS min_date,
    maxState(execution_date) AS max_date,
    minState(price) AS min_price,
    quantileState(0.5)(price) AS median_price,
    maxState(price) AS max_price
FROM
    hardda_student_data.prj_main_nav2103
GROUP BY 
    advert_id, 
    user_id,
    mark,
    model
'''

client.execute(query)

[]

Inserting data to our AggregatingMergeTree engine table. 

In [25]:
query = '''
INSERT INTO hardda_student_data.nav2103_agg_2
SELECT
    advert_id, 
    user_id,
    mark,
    model,
    minState(execution_date) AS min_date,
    maxState(execution_date) AS max_date,
    minState(price) AS min_price,
    quantileState(0.5)(price) AS median_price,
    maxState(price) AS max_price
FROM
    hardda_student_data.prj_main_nav2103
GROUP BY 
    advert_id, 
    user_id,
    mark,
    model
'''

client.execute(query)

[]

Checking the result. 

In [30]:
query = '''
SELECT
    advert_id, 
    user_id,
    mark,
    model,
    minMerge(min_price) AS min_price,
    maxMerge(max_price) AS max_price,
    minMerge(min_date) AS min_date
FROM 
    hardda_student_data.prj_main_nav2103_agg_view_2
GROUP BY 
    advert_id, 
    user_id,
    mark,
    model
ORDER BY mark
LIMIT(5)
'''

In [31]:
result = client.execute(query)

for i in result:
    print(i)

(269622970, 125442172, 'Alfa Romeo', '159', 1260000, 1318000, datetime.date(2021, 10, 23))
(270284241, 135472405, 'Aro', '24', 160000, 160000, datetime.date(2021, 11, 9))
(270343387, 124090146, 'Audi', 'A6', 840000, 840000, datetime.date(2021, 11, 11))
(264975487, 131539512, 'Audi', '100', 420000, 520000, datetime.date(2021, 7, 11))
(259842140, 134708515, 'Audi', 'A6', 540000, 590000, datetime.date(2021, 3, 21))


Define top-5 the fastest selling marks?

In [78]:
query = '''
WITH delta_days AS (
SELECT
    mark,
    advert_id,
    maxMerge(max_date) - minMerge(min_date) AS days_delta
FROM 
    hardda_student_data.prj_main_nav2103_agg_view_2
WHERE
    mark is not null
GROUP BY 
    mark,
    advert_id
)

SELECT
    mark,
    round(avg(days_delta), 0) AS avg_delta
FROM 
    delta_days
GROUP BY 
    mark
ORDER BY 
    avg_delta 
LIMIT(5)
'''

In [79]:
result = client.execute(query)

for i in result:
    print(i)

('Aro', 1.0)
('SEAT', 4.0)
('Datsun', 7.0)
('DongFeng', 10.0)
('Alfa Romeo', 14.0)


Define top-5 the slowest selling marks?

In [76]:
query = '''
WITH delta_days AS (
SELECT
    mark,
    advert_id,
    maxMerge(max_date) - minMerge(min_date) AS days_delta
FROM 
    hardda_student_data.prj_main_nav2103_agg_view_2
WHERE
    mark is not naull
GROUP BY 
    mark,
    advert_id
)

SELECT
    mark,a
    round(avg(days_delta), 0) AS avg_delta
FROM 
    delta_days
GROUP BY 
    mark
ORDER BY 
    avg_delta DESC
LIMIT(5)
'''

In [77]:
result = client.execute(query)

for i in result:
    print(i)

('Ретро-автомобили', 215.0)
('Scion', 213.0)
('Rover', 195.0)
('Great Wall', 184.0)
('MG', 177.0)


Determine what share of the total base these groups make up (combined TOP-5 models with the fastest sales and TOP-5 brands with the slowest sales)?

In [166]:
query = '''
WITH ten_marks_cnt AS (
SELECT
    mark,
    count(distinct(advert_id)) AS cnt_ads
FROM 
    hardda_student_data.prj_main_nav2103_agg_view_2
WHERE 1=1
    AND mark in ('Ретро-автомобили', 'Scion', 'Rover', 'Great Wall', 'MG',
                 'Aro', 'SEAT', 'Datsun', 'DongFeng', 'Alfa Romeo')
GROUP BY 
    mark
)

SELECT 
  round(sum(cnt_ads) / (SELECT count(distinct(advert_id)) FROM hardda_student_data.prj_main_nav2103_agg_view_2) * 100, 2)
FROM 
  ten_marks_cnt
'''

In [167]:
result = client.execute(query)

for i in result:
    print(i)

(0.03,)


Determine what share of the total number of listers these groups make up (combined TOP-5 models with the fastest sales and TOP-5 brands with the slowest sales)?

In [168]:
query = '''
WITH ten_marks_cnt AS (
SELECT
    mark,
    count(distinct(user_id)) AS cnt_ads
FROM 
    hardda_student_data.prj_main_nav2103_agg_view_2
WHERE 1=1
    AND mark in ('Ретро-автомобили', 'Scion', 'Rover', 'Great Wall', 'MG',
                 'Aro', 'SEAT', 'Datsun', 'DongFeng', 'Alfa Romeo')
GROUP BY 
    mark
)

SELECT 
  round(sum(cnt_ads) / (SELECT count(distinct(user_id)) FROM hardda_student_data.prj_main_nav2103_agg_view_2) * 100, 2)
FROM 
  ten_marks_cnt
'''

In [169]:
result = client.execute(query)

for i in result:
    print(i)

(0.18,)


### Aggregating Materialize View 3

tbc..