In [6]:
# import pyspark.sql.functions as f
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import col,from_json
# from pyspark.sql.types import StringType, StructType, StructField, LongType, ArrayType, MapType,IntegerType,TimestampType
# from pyspark.sql.functions import *

from util.config import Config
from util.logger import Log4j

import psycopg2

conf = Config()
spark_conf = conf.spark_conf
kaka_conf = conf.kafka_conf
postgres_conf =conf.postgres_conf


# spark = SparkSession.builder \
#     .config(conf=spark_conf) \
#     .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.postgresql:postgresql:42.7.3") \
#     .getOrCreate()

# log = Log4j(spark)
# log.info(f"spark_conf: {spark_conf.getAll()}")
# log.info(f"kafka_conf: {kaka_conf.items()}")

util_dir: /spark/12-project/util


In [2]:
def connect_to_postgres():
    try:
        connection = psycopg2.connect(
            dbname=postgres_conf['dbname'],
            user=postgres_conf['user'],
            password=postgres_conf['password'],
            host=postgres_conf['host'],
            port=postgres_conf['port']
        )
        print("Connection to PostgreSQL established.")
        return connection
    except Exception as e:
        print("Error connecting to PostgreSQL:", e)
        return None
        
def create_table(connection,create_table_query):
    try:
        cursor = connection.cursor()
        cursor.execute(create_table_query)
        connection.commit()
        print("Table created successfully.")
    except Exception as e:
        print("Error creating table:", e)
    finally:
        cursor.close()
        
def save_to_postgres(df,dbtable):
    df.write \
    .format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://postgres:5432/behavior") \
    .option("dbtable", f"{dbtable}") \
    .option("user", "admin") \
    .option("password", "admin") \
    .mode("append") \
    .save()

In [3]:
def insert_to_table(partition,insert_query,columns):
    try:
        #Connect to Postgres
        conn = psycopg2.connect(
            dbname=postgres_conf['dbname'],
            user=postgres_conf['user'],
            password=postgres_conf['password'],
            host=postgres_conf['host'],
            port=postgres_conf['port']
        )
        cursor = conn.cursor()

        if partition:
            # Duyệt qua từng hàng trong phân vùng và lưu vào PostgreSQL
            for row in partition:
                try:
                    values = [getattr(row, col) for col in columns]
                    cursor.execute(insert_query,values)
                except Exception as e:
                    # Log lỗi chi tiết khi có lỗi trong từng dòng
                    print(f"Lỗi khi chèn dòng {row}: {e}")
        
        # Ghi thay đổi và đóng kết nối
        conn.commit()
    except Exception as e:
        # Log lỗi khi có lỗi kết nối hoặc lỗi chung trong phân vùng
        print(f"Lỗi trong phân vùng: {e}")
    finally:
        # Đảm bảo kết nối được đóng lại
        if 'conn' in locals() and conn:
            cursor.close()
            conn.close()

def insert_to_table(partition, insert_query, columns):
    # Kết nối đến PostgreSQL
    conn = psycopg2.connect(
        dbname=postgres_conf['dbname'],
        user=postgres_conf['user'],
        password=postgres_conf['password'],
        host=postgres_conf['host'],
        port=postgres_conf['port']
    )
    cursor = conn.cursor()

    if partition:
        # Duyệt qua từng hàng trong phân vùng và lưu vào PostgreSQL
        for row in partition:
            values = [getattr(row, col) for col in columns]
            cursor.execute(insert_query, values)

    # Ghi thay đổi sau khi chèn thành công
    conn.commit()

    # Đảm bảo đóng kết nối và cursor
    cursor.close()
    conn.close()

In [4]:
def insert_to_dim_browser(df_browser):
    columns = ["browser_key","browser_name"]
    column_names = ",".join(columns)
    placeholders = ', '.join(['%s'] * len(columns))
    insert_query = f"INSERT INTO dim_browser ({column_names}) VALUES ({placeholders}) ON CONFLICT (browser_key) DO NOTHING"
    df_browser.foreachPartition(lambda partition: insert_to_table(partition,insert_query,columns))

def insert_to_dim_os(df_os):
    columns = ["os_key","os_name"]
    column_names = ",".join(columns)
    placeholders = ', '.join(['%s'] * len(columns))
    insert_query = f"INSERT INTO dim_operating_system ({column_names}) VALUES ({placeholders}) ON CONFLICT (os_key) DO NOTHING"
    df_os.foreachPartition(lambda partition: insert_to_table(partition,insert_query,columns))

def insert_to_dim_reference(df_reference):
    columns = ["reference_key","reference_domain","is_self_reference"]
    column_names = ",".join(columns)
    placeholders = ', '.join(['%s'] * len(columns))
    insert_query = f"INSERT INTO dim_reference_domain ({column_names}) VALUES ({placeholders}) ON CONFLICT (reference_key) DO NOTHING"
    df_reference.foreachPartition(lambda partition: insert_to_table(partition,insert_query,columns))


def insert_to_fact_view(df_fact_view):
    columns = ["key","date_key","location_key","product_key","store_id","reference_key","browser_key","os_key","total_view"]
    column_names = ",".join(columns)
    placeholders = ', '.join(['%s'] * len(columns))
    insert_query = f"""INSERT INTO fact_view ({column_names}) 
                        VALUES ({placeholders}) 
                        ON CONFLICT (key) 
                        DO UPDATE SET total_view = fact_view.total_view + EXCLUDED.total_view;"""
    df_fact_view.foreachPartition(lambda partition: insert_to_table(partition,insert_query,columns))

In [7]:
connection = connect_to_postgres()
create_table_query = """
DROP TABLE IF EXISTS Dim_Date;
CREATE TABLE  Dim_Date (
    datetime_key INT PRIMARY KEY,
    full_date DATE,
    day_of_week VARCHAR(10),
    day_of_week_short VARCHAR(10),
    hour INT,
    day_of_month INT,
    month INT,
    year INT
);

DROP TABLE IF EXISTS Dim_Product;
CREATE TABLE  Dim_Product (
    product_key INT PRIMARY KEY,
    product_name VARCHAR(50)
);

DROP TABLE IF EXISTS Dim_Location;
CREATE TABLE  Dim_Location (
    location_key INT PRIMARY KEY,
    country_iso2 VARCHAR(20),
    country_iso3 VARCHAR(20) ,
    country_name VARCHAR(50),
    country_nicename VARCHAR(50),
    country_numcode INT,
    country_phonecode INT
);

DROP TABLE IF EXISTS Dim_Reference_Domain;
CREATE TABLE  Dim_Reference_Domain (
    reference_key INT PRIMARY KEY,
    reference_domain VARCHAR(255),
    is_self_reference BOOLEAN
);

DROP TABLE IF EXISTS Dim_Operating_System;
CREATE TABLE  Dim_Operating_System (
    os_key INT PRIMARY KEY,
    os_name VARCHAR(50)
);

DROP TABLE IF EXISTS Dim_Browser;
CREATE TABLE  Dim_Browser (
    browser_key INT PRIMARY KEY,
    browser_name VARCHAR(50)
);

DROP TABLE IF EXISTS Fact_View;
CREATE TABLE  Fact_View (
    key VARCHAR(255) PRIMARY KEY, 
    product_key INT NOT NULL,
    location_key INT NOT NULL,
    date_key INT NOT NULL,
    reference_key INT NOT NULL,
    os_key INT NOT NULL,
    browser_key INT NOT NULL,
    store_id INT,
    total_view INT
);
"""
create_table(connection,create_table_query)
connection.close()

Connection to PostgreSQL established.
Table created successfully.


In [None]:
df_behavior = spark.read.format('parquet').load('/data/data_behavior/product_view')

In [None]:
import sys
def get_partition_size(index, iterator):
    partition_size = sum(sys.getsizeof(row) for row in iterator)
    return [(index, partition_size)]
    
partition_sizes = df_behavior.rdd.mapPartitionsWithIndex(get_partition_size).collect()
for index, size in partition_sizes:
    print(f"Partition {index}: {size / (1024 * 1024):.2f} MB")

In [None]:
# CREATE DIM DATE

def create_dim_date(spark):
    from pyspark.sql.functions import expr,dayofweek,date_format,dayofmonth,month,year,hour
    
    # Tạo DataFrame có dãy ngày từ 01-10-2024 đến 10-10-2024
    df_dates = spark.range(1).select(expr("sequence(to_timestamp('2024-01-01 00:00:00'), to_timestamp('2025-01-01 00:00:00'), interval 1 hour) as timestamp_seq"))
    
    # Explode chuỗi ngày để có mỗi ngày một dòng
    df_dates_generate = df_dates.selectExpr("explode(timestamp_seq) as timestamp")
    # Tao cac cot lien quan
    df_dates_generate_column = df_dates_generate \
    .withColumn('datetime_key',date_format("timestamp", "yyyyMMddHH").cast('int')) \
    .withColumn('full_date',col('timestamp').cast('date')) \
    .withColumn("day_of_week", date_format("full_date", "EEEE")) \
    .withColumn('day_of_week_short',dayofweek("full_date")) \
    .withColumn('day_of_month',dayofmonth("full_date")) \
    .withColumn('month',month("full_date")) \
    .withColumn('year',year("full_date")) \
    .withColumn('hour',hour("timestamp")) \
    .drop("timestamp")
    
    save_to_postgres(df_dates_generate_column,'dim_date')

In [None]:
# CREATE DIM LOCATION

def create_dim_location(spark):
    from pyspark.sql.functions import hash,abs,col,when

    schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("iso", StringType(), True),
        StructField("name", StringType(), True),
        StructField("nicename", StringType(), True),
        StructField("iso3", StringType(), True),
        StructField("numcode", IntegerType(), True),
        StructField("phonecode", IntegerType(), True)
    ])
    df_location = spark.read \
    .format('csv') \
    .option('header',True) \
    .schema(schema) \
    .option('path','/data/country.csv') \
    .load()

    undefined_row = spark.sql("""
    SELECT
            0 AS id
            , 'Undefined' AS iso
            , 'Undefined' AS name
            , 'Undefined' AS nicename
            , 'Undefined' AS iso3
            , -1 AS numcode
            , -1 AS phonecode
    """)
    df_location_final = df_country \
    .union(undefined_row) \
    .orderBy('id') \
    .withColumn('location_key',abs(hash('iso'))) \
    .selectExpr("location_key",
               "iso AS country_iso2",
               "iso3 AS country_iso3",
               "name AS country_name",
               "nicename AS country_nicename",
               "numcode AS country_numcode",
               "phonecode AS country_phonecode")

    save_to_postgres(df_location_final,'dim_location')
    

In [None]:
# CREATE DIM PRODUCT
def create_dim_product(spark):
    from pyspark.sql.functions import hash,abs,col,when
    
    schema = StructType([
            StructField("id", IntegerType(), True),
            StructField("name", StringType(), True),
        ])
    
    df_product = spark.read \
    .format('csv') \
    .option('header',True) \
    .schema(schema) \
    .option('path','/data/dim_product.csv') \
    .load()

    df_product_final = df_product.selectExpr("id AS product_key","name AS product_name")
    save_to_postgres(df_product_final,"dim_product")

In [None]:
from pyspark.sql.functions import col, split, regexp_extract,size,upper
from pyspark.sql.functions import hash,abs,col,when,concat,sha2,md5,coalesce,lit

extract_current_domain = split(col("current_url"),"/")[2]
extract_reference_domain = split(col("referrer_url"),"/")[2]
num_parts = size(split(extract_current_domain, r"\."))



df_behavior_extract_domain  = df_behavior \
.withColumn('current_domain',extract_current_domain) \
.withColumn('reference_domain',extract_reference_domain)

In [None]:
#HANDLE FACT VIEW

from pyspark.sql.functions import *

# GENKEY LOCATION
extract_coutnry_code = upper(split(extract_current_domain,r"\.")[num_parts -1])
fix_country_code =  expr("""CASE
                                WHEN country_code = 'COM' THEN 'US'
                                WHEN country_code = 'AFRICA'  THEN 'BF'
                                WHEN country_code = 'MEDIA' THEN 'LY'
                                WHEN country_code = 'STORE' THEN 'CU'
                                WHEN country_code = '' THEN 'Undefined'
                                ELSE country_code
                            END AS country_code
                        """)
gen_location_key = abs(hash('country_code'))

#GENKEY DATE
gen_date_key = date_format("local_time", "yyyyMMddHH").cast('int')

#GENKEY REFERENCE
handle_refernce_null = expr("IFNULL(reference_domain,'Undefined') AS reference_domain")
is_self_reference = expr(""" CASE 
                                WHEN current_domain = reference_domain THEN True
                                ELSE False
                            END AS is_self_reference
                        """)
gen_reference_key = abs(hash('reference_domain'))

#GENKEY BROWER
def parse_browser(ua):
    user_agent = parse(ua)
    return user_agent.browser.family
parse_browser_udf = udf(parse_browser, returnType=StringType())
gen_browser_key = abs(hash('browser'))

#GENKEY OS
def parse_os(ua):
    user_agent = parse(ua)
    return user_agent.os.family
    
parse_os_udf = udf(parse_os, returnType=StringType())
gen_os_key = abs(hash('os'))

#HANDLE PRODUCT
handle_null_product_id = expr("IFNULL(product_id,-1)")


df_behavior_genkey = df_behavior_extract_domain.limit(10) \
.withColumn('country_code',extract_coutnry_code) \
.withColumn('country_code',fix_country_code) \
.withColumn('location_key',gen_location_key) \
.withColumn('date_key',gen_date_key) \
.withColumn('reference_domain',handle_refernce_null) \
.withColumn('is_self_reference',is_self_reference) \
.withColumn('reference_key',gen_reference_key) \
.withColumn('browser',parse_browser_udf("user_agent")) \
.withColumn('browser_key',gen_browser_key) \
.withColumn('os',parse_os_udf("user_agent")) \
.withColumn('os_key',gen_os_key) \
.withColumn("product_id",handle_null_product_id) \
.withColumnRenamed("product_id","product_key")

In [None]:
%%time
gen_fact_key = md5(
        concat(
            coalesce(col("date_key").cast("string"), lit("")),
            coalesce(col("location_key").cast("string"), lit("")),
            coalesce(col("product_key").cast("string"), lit("")),
            coalesce(col("store_id").cast("string"), lit("")),
            coalesce(col("reference_key").cast("string"), lit("")),
            coalesce(col("browser_key").cast("string"), lit("")),
            coalesce(col("os_key").cast("string"), lit(""))
        )
    )

fact_view = df_behavior_genkey \
.groupBy("date_key",
        "location_key",
        "product_key",
        "store_id",
       "reference_key",
       "browser_key",
       "os_key"
       ) \
.agg(expr("count(*) AS total_view")) \
.withColumn("key",gen_fact_key)
insert_to_fact_view(fact_view)

In [None]:
## STORE DIM BROWSER
df_dim_browser = df_behavior_genkey \
.selectExpr("browser_key",
            "browser AS browser_name") \
.distinct()
insert_to_dim_browser(df_dim_browser)


In [None]:
## STORE DIM OS
df_dim_os = df_behavior_genkey \
.selectExpr("os_key",
            "os AS os_name") \
.distinct()
insert_to_dim_os(df_dim_os)


In [None]:
## STORE DIM REFER
df_dim_refer = df_behavior_genkey \
.selectExpr("reference_key",
            "reference_domain",
            "is_self_reference") \
.distinct()
insert_to_dim_reference(df_dim_refer)

In [9]:
from pyspark.sql import DataFrame

def process_batch(df: DataFrame, batch_id: int):
    df_behavior = df
    extract_current_domain = split(col("current_url"),"/")[2]
    extract_reference_domain = split(col("referrer_url"),"/")[2]
    num_parts = size(split(extract_current_domain, r"\."))
    
    
    
    df_behavior_extract_domain  = df_behavior \
    .withColumn('current_domain',extract_current_domain) \
    .withColumn('reference_domain',extract_reference_domain)

    extract_coutnry_code = upper(split(extract_current_domain,r"\.")[num_parts -1])
    fix_country_code =  expr("""CASE
                                    WHEN country_code = 'COM' THEN 'US'
                                    WHEN country_code = 'AFRICA'  THEN 'BF'
                                    WHEN country_code = 'MEDIA' THEN 'LY'
                                    WHEN country_code = 'STORE' THEN 'CU'
                                    WHEN country_code = '' THEN 'Undefined'
                                    ELSE country_code
                                END AS country_code
                            """)
    gen_location_key = abs(hash('country_code'))
    
    #GENKEY DATE
    gen_date_key = date_format("local_time", "yyyyMMddHH").cast('int')
    
    #GENKEY REFERENCE
    handle_refernce_null = expr("IFNULL(reference_domain,'Undefined') AS reference_domain")
    is_self_reference = expr(""" CASE 
                                    WHEN current_domain = reference_domain THEN True
                                    ELSE False
                                END AS is_self_reference
                            """)
    gen_reference_key = abs(hash('reference_domain'))
    
    #GENKEY BROWER
    parse_browser_udf = udf(lambda ua: parse(ua).browser.family, StringType())
    gen_browser_key = abs(hash('browser'))
    
    #GENKEY OS
    parse_os_udf = udf(lambda ua: parse(ua).os.family, StringType())
    gen_os_key = abs(hash('os'))
    
    #HANDLE PRODUCT
    handle_null_product_id = expr("IFNULL(product_id,-1)")
    
    
    df_behavior_genkey = df_behavior_extract_domain \
    .withColumn('country_code',extract_coutnry_code) \
    .withColumn('country_code',fix_country_code) \
    .withColumn('location_key',gen_location_key) \
    .withColumn('date_key',gen_date_key) \
    .withColumn('reference_domain',handle_refernce_null) \
    .withColumn('is_self_reference',is_self_reference) \
    .withColumn('reference_key',gen_reference_key) \
    .withColumn('browser',parse_browser_udf("user_agent")) \
    .withColumn('browser_key',gen_browser_key) \
    .withColumn('os',parse_os_udf("user_agent")) \
    .withColumn('os_key',gen_os_key) \
    .withColumn("product_id",handle_null_product_id) \
    .withColumnRenamed("product_id","product_key")

    gen_fact_key = md5(
            concat(
                coalesce(col("date_key").cast("string"), lit("")),
                coalesce(col("location_key").cast("string"), lit("")),
                coalesce(col("product_key").cast("string"), lit("")),
                coalesce(col("store_id").cast("string"), lit("")),
                coalesce(col("reference_key").cast("string"), lit("")),
                coalesce(col("browser_key").cast("string"), lit("")),
                coalesce(col("os_key").cast("string"), lit(""))
            )
        )
    
    fact_view = df_behavior_genkey \
    .groupBy("date_key",
            "location_key",
            "product_key",
            "store_id",
           "reference_key",
           "browser_key",
           "os_key"
           ) \
    .agg(expr("count(*) AS total_view")) \
    .withColumn("key",gen_fact_key)
    insert_to_fact_view(fact_view)

    ## STORE DIM BROWSER
    df_dim_browser = df_behavior_genkey \
    .selectExpr("browser_key",
                "browser AS browser_name") \
    .distinct()
    insert_to_dim_browser(df_dim_browser)
    
    ## STORE DIM OS
    df_dim_os = df_behavior_genkey \
    .selectExpr("os_key",
                "os AS os_name") \
    .distinct()
    insert_to_dim_os(df_dim_os)

    ## STORE DIM REFER
    df_dim_refer = df_behavior_genkey \
    .selectExpr("reference_key",
                "reference_domain",
                "is_self_reference") \
    .distinct()
    insert_to_dim_reference(df_dim_refer)

In [None]:
df = spark.readStream \
    .format("kafka") \
    .option("auto.offset.reset", "earliest") \
    .option("startingOffsets","earliest") \
    .options(**kaka_conf) \
    .load()

# maxOffsetsPerTrigger
def normalized_df(df):
    schema = StructType([
        StructField("_id", StringType(), True),
        StructField("time_stamp", StringType(), True),
        StructField("ip", StringType(), True),
        StructField("user_agent", StringType(), True),
        StructField("resolution", StringType(), True),
        StructField("device_id", StringType(), True),
        StructField("api_version", StringType(), True),
        StructField("store_id", StringType(), True),
        StructField("local_time", TimestampType(), True),
        StructField("show_recommendation", StringType(), True),
        StructField("current_url", StringType(), True),
        StructField("referrer_url", StringType(), True),
        StructField("email_address", StringType(), True),
        StructField("collection", StringType(), True),
        StructField("product_id", StringType(), True),
        StructField("option", ArrayType(StructType([
            StructField("option_label", StringType(), True),
            StructField("option_id", StringType(), True),
            StructField("value_label", StringType(), True),
            StructField("value_id", StringType(), True),
        ])), True),
        StructField("id", StringType(), True)
    ])
    parsed_df = (
        df.withColumn("json_data", from_json(col("value").cast("string"), schema))
        .select("json_data.*")
        .withColumn("time_stamp", col("time_stamp").cast(LongType()))
        .withColumn("product_id", col("product_id").cast(IntegerType()))
        .withColumn("store_id", col("store_id").cast(IntegerType()))
        .withColumn("local_date",f.to_date(col("local_time"),"yyyy-MM-dd"))
    )
    return parsed_df

query = df.transform(lambda df: normalized_df(df)) \
    .writeStream \
    .format("console") \
    .option("truncate", False) \
    .trigger(processingTime="30 seconds") \
    .start()
query.awaitTermination()
    # .option("maxOffsetsPerTrigger","1000") \


24/11/05 09:21:39 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-17a72b18-7963-46bd-a53c-abe667910df5. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/11/05 09:21:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/11/05 09:21:40 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------+----------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+----------+------------------------------------+-----------+--------+-------------------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+-------------------------------------+----------+----------------------------------------------------------+------------------------------------+----------+
|_id                     |time_stamp|ip    

In [None]:
# query = df.transform(lambda df: normalized_df(df,schema)) \
#     .writeStream \
#     .format("console") \
#     .outputMode("append") \
#     .trigger(processingTime='10 seconds') \
#     .option("checkpointLocation", "/tmp/kafka_checkpoint") \
#     .start()


# query = df.transform(lambda df: normalized_df(df,schema)) \
#     .writeStream \
#     .format("parquet") \
#     .outputMode("append") \
#     .option("checkpointLocation", "/data/data_behavior/kafka_checkpoint/") \
#     .option("path", "/data/data_behavior/product_view") \
#     .start() \
#     .awaitTermination()

In [None]:
#HANDLE Dim Referer
from pyspark.sql.functions import expr

df_referer = df_behavior_extract_domain \
.selectExpr("IFNULL(reference_domain,'Undefined') AS reference_domain",
            """
            CASE 
                WHEN current_domain = reference_domain THEN True
                ELSE False
            END AS is_self_reference
            """) \
.distinct() \
.withColumn('reference_key',abs(hash('reference_domain')))
insert_to_dim_reference(df_referer)

In [None]:
#HANDLE DIM BROWER
from pyspark.sql.functions import hash,abs,udf, col

def parse_browser(ua):
    user_agent = parse(ua)
    return user_agent.browser.family
parse_browser_udf = udf(parse_browser, returnType=StringType())

df_browser = df_behavior \
.select("user_agent") \
df_browser_repartition = df_browser.repartition(2)

df_browser_final1 = df_browser_repartition \
.withColumn("browser", parse_browser_udf("user_agent")) \
.withColumn("browser_key", abs(hash('browser'))) \
.selectExpr("browser_key",
            "IF(browser = '','Undefined',browser) AS browser_name") \
.distinct()

insert_to_dim_browser(df_browser_final1)

In [None]:
#Handle Dim OS
from pyspark.sql.functions import hash,abs,udf, col

def parse_os(ua):
    user_agent = parse(ua)
    return user_agent.os.family
    
parse_os_udf = udf(parse_os, returnType=StringType())



df_os = df_behavior \
.select("user_agent") \
.limit(100) \
.withColumn("os", parse_os_udf("user_agent")) \
.withColumn("os_key", abs(hash('os'))) \
.selectExpr("os_key",
            "IF(os = '','Undefined',os) AS os_name") \
.distinct()
insert_to_dim_os(df_os)

In [None]:
df_referer.show(5,False)

In [None]:
insert_to_dim_browser(df_browser_final1)

In [None]:
df_browser.write \
    .format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://postgres:5432/behavior") \
    .option("dbtable", "dim_browser") \
    .option("user", "admin") \
    .option("password", "admin") \
    .mode("append") \
    .save()

In [None]:
df_os.write \
    .format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://postgres:5432/behavior") \
    .option("dbtable", "dim_operating_system") \
    .option("user", "admin") \
    .option("password", "admin") \
    .mode("ignore") \
    .save()

In [None]:
connection.close()

In [None]:
spark.stop()