# CONFIG

In [147]:
SOURCE_DATA = 'https://storage.yandexcloud.net/hackathon/events-2022-Sep-30-2134.parquet'
STAGE_DIR = '/user/ht2/master/data'
DB_HOST = '158.160.116.182'
DB_PORT = '5432'
DB_USER = "de_student"
DB_PASSWORD = "de_student"
DB_NAME = 'de_student'
JDBC_URL = f"jdbc:postgresql://{DB_HOST}:{DB_USER}/{DB_NAME}"
DB_INIT_SCHEMA = '/home/ubuntu/jupyter/db_initialization.sql'
CDM_INSERT_SCHEMA = '/home/ubuntu/jupyter/cdm_insert.sql'

# Source to stage

In [123]:
!hdfs dfs -mkdir -p {STAGE_DIR}

In [None]:
!hdfs dfs -cp {SOURCE_DATA} {STAGE_DIR}

# Prepara Postgresql DB if not exists

In [148]:
import psycopg2

In [149]:
connection = psycopg2.connect(
    host=DB_HOST,
    port=DB_PORT,
    database=DB_NAME,
    user=DB_USER,
    password=DB_PASSWORD)

In [None]:
with connection.cursor() as cursor:
    cursor.execute(open(DB_INIT_SCHEMA, "r").read())
connection.commit()

# Stage to DDS

In [113]:
import os
os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'

import findspark
findspark.init()
findspark.find()

import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

In [17]:
spark = SparkSession \
    .builder \
    .master("yarn") \
        .config("spark.driver.cores", "2") \
        .config("spark.driver.memory", "2g") \
        .appName("stage_to_dds") \
        .getOrCreate()

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-09-16 08:12:20,771 WARN util.Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.
2023-09-16 08:12:33,049 WARN util.Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.
2023-09-16 08:12:33,063 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpo

In [128]:
df = spark.read.parquet(STAGE_DIR)

In [111]:
df_dds = df\
    .withColumn("event_timestamp",F.to_timestamp("event_timestamp"))\
    .withColumn('date', F.to_timestamp(F.date_format(F.col("event_timestamp"), 'yyyy-MM-dd')))\
    .withColumn('hour', F.date_format(F.col("event_timestamp"), 'HH').cast(IntegerType()))\
    .withColumn('event_type_from_url', F.regexp_extract("page_url", r"com\/([^\W_]+)", 1))

In [112]:
df_dds.write.mode("append").format("jdbc")\
    .option("url", JDBC_URL) \
    .option("driver", "org.postgresql.Driver").option("dbtable", "dds.event_log") \
    .option("user", DB_USER).option("password", DB_PASSWORD).save()

                                                                                

# Fill CDM

In [None]:
with connection.cursor() as cursor:
    cursor.execute(open(CDM_INSERT_SCHEMA, "r").read())
connection.commit()

In [150]:
connection.close()