In [25]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
import pandas as pd
from pyspark.sql.functions import when, col
import os
from functools import reduce
import logging
from datetime import datetime


spark = SparkSession \
    .builder \
    .appName("ETL_Scripts") \
    .config('spark.driver.memory','4g')\
    .config('spark.executor.memory', '8g') \
    .config("spark.jars.packages","com.mysql:mysql-connector-j:8.0.33")\
    .getOrCreate()

In [26]:
def process_data(df, date):
    data = df.select( '_source.Contract','_source.AppName', '_source.Mac', '_source.TotalDuration')
    data = data.withColumn("Date", lit(date))
    return data 
    
def process_Category(df):
    data = df.withColumn("Type", 
                  when((col("AppName") == 'CHANNEL') | (col("AppName") == 'DSHD') | (col("AppName") == 'KPLUS') | (col("AppName") == 'Kplus'), "TVDuration")
                .when((col("AppName") == 'VOD') | (col("AppName") == 'FIM_RES') | (col("AppName") == 'BHD_RES') | (col("AppName") == 'VOD_RES') | (col("AppName") == 'FIMS') | (col("AppName") == 'BHD') | (col("AppName") == 'DANET'), "MovieDuration")
                .when((col("AppName") == 'RELAX'), "RelaxDuration")
                .when((col("AppName") == 'CHILD'), "ChildDuration")
                .when((col("AppName") == 'SPORT'), "SportDuration"))
    data = data.drop("AppName").drop("Mac")
    return data

def pivot(df):
    data = df.groupBy("Contract", "Type").agg((sum('TotalDuration').alias("TotalDuration")))
    data = data.groupBy("Contract").pivot("Type").sum("TotalDuration")
    data = data.fillna(0)
    return data

def calculate_user_behavior(df):
    # Calculate TotalDuration
    df = df.withColumn('TotalDuration', col('ChildDuration') + col('MovieDuration') + col('RelaxDuration') + col('SportDuration') + col('TVDuration'))

    # Calculate MostWatch
    df = df.withColumn('MostWatch',
                    when((col('ChildDuration') >= col('MovieDuration')) & (col('ChildDuration') >= col('RelaxDuration'))
                            & (col('ChildDuration') >= col('SportDuration')) & (col('ChildDuration') >= col('TVDuration')), 'Child')
                    .when((col('MovieDuration') >= col('ChildDuration')) & (col('MovieDuration') >= col('RelaxDuration'))
                            & (col('MovieDuration') >= col('SportDuration')) & (col('MovieDuration') >= col('TVDuration')), 'Movie')
                    .when((col('RelaxDuration') >= col('MovieDuration')) & (col('RelaxDuration') >= col('ChildDuration'))
                            & (col('RelaxDuration') >= col('SportDuration')) & (col('RelaxDuration') >= col('TVDuration')), 'Relax')
                    .when((col('SportDuration') >= col('MovieDuration')) & (col('SportDuration') >= col('RelaxDuration'))
                            & (col('SportDuration') >= col('ChildDuration')) & (col('SportDuration') >= col('TVDuration')), 'Sport')
                    .when((col('SportDuration') >= col('MovieDuration')) & (col('SportDuration') >= col('RelaxDuration'))
                            & (col('SportDuration') >= col('SportDuration')) & (col('SportDuration') >= col('ChildDuration')), 'TV'))
                            
    # Calculate Taste
    cols = [("Child", "ChildDuration"), 
            ("Movie", "MovieDuration"), 
            ("Relax", "RelaxDuration"), 
            ("Sport", "SportDuration"), 
            ("TV", "TVDuration")]

    taste_expr = [when(col(c) > 0, name) for name, c in cols]
    df = df.withColumn("Taste", concat_ws("-", *taste_expr))

    return df

def analyze_user_type(df):
    bounds = {
        'TotalDuration': dict(
            zip(["q1", "q3"], df.approxQuantile('TotalDuration', [0.25, 0.75], 0))
        )}
    df = df.withColumn("Type", when(df.TotalDuration < bounds['TotalDuration']['q1'], "Low")
                                    .when(df.TotalDuration > bounds['TotalDuration']['q3'], "High")
                                    .otherwise("Medium"))
    return df

def calc_activeness(df):
    df = df.select("Date", "Contract").distinct()
    df = df.groupBy("Contract").agg(count("*").alias("Activeness"))
    return df


def calc_clinginess(df):
    df = df.withColumn('Clinginess', when((col('Type') == 'Low') & (col('Activeness') <= 20), 'Low')
                                    .when((col('Type') == 'Low') & (col('Activeness') > 20), 'Medium')
                                    .when((col('Type') == 'Medium') & (col('Activeness') <= 10), 'Low')
                                    .when((col('Type') == 'Medium') & (col('Activeness') > 10) & (col('Activeness') <= 20), 'Medium')
                                    .when((col('Type') == 'Medium') & (col('Activeness') > 20), 'High')
                                    .when((col('Type') == 'High') & (col('Activeness') <= 10), 'Medium')
                                    .when((col('Type') == 'High') & (col('Activeness') > 10), 'High'))

    return df


def generate_list_day(from_day, to_day):
    list_day = []
    for i in range(from_day, to_day + 1):
        if i>9:
            list_day.append(f"202204{i}")
            continue
        list_day.append(f"2022040{i}")
    return list_day

def import_mysql (data, user, password, db, table, batch_size=500):
    try:
        logging.info("Starting to write data to MySQL...")
        data.repartition(5).write.format("jdbc") \
            .option("driver","com.mysql.cj.jdbc.Driver") \
            .option("url", f"jdbc:mysql://localhost:3306/{db}") \
            .option("dbtable", table) \
            .mode("append") \
            .option("user", user) \
            .option("batchsize", batch_size) \
            .option("password", password) \
            .option("socketTimeout", "3600")\
            .save()
        logging.info("Data written to MySQL successfully.")
    except Exception as e:
        print("Error occurred while writing to MySQL:", e)

def read_all_day(list_day):
    all_dataframes = []

    for date in list_day:
        filepath = folderpath + "\\" + date + ".json"
        df = spark.read.json(filepath)

        formatted_date = datetime.strptime(date, "%Y%m%d").strftime("%Y-%m-%d")
        df = process_data(df, formatted_date)
        all_dataframes.append(df)

    df_allDay = reduce(lambda df1, df2: df1.union(df2), all_dataframes)
    return df_allDay

def transform_all_day(df):
    df.persist()
    print("---Processing Category---")
    df_category = process_Category(df)
    
    print("---Pivoting---")
    df = pivot(df_category)

    print("---Calculating Behavior---")
    df = calculate_user_behavior(df)

    print("---Analyzing Type---")
    df = analyze_user_type(df)

    print("---Calculating Activeness---")
    df_activeness = calc_activeness(df_category)

    df = df.join(df_activeness, "Contract", "inner")

    print("---Calculating Clinginess---")
    df = calc_clinginess(df)

    return df


def main_task():
    from_day = int(input("From: "))
    to_day = int(input("To: "))
    print(f"From 2022-04-{from_day} to 2022-04-{to_day}")
    list_day = generate_list_day(from_day, to_day)
    
    df_pre = read_all_day(list_day)
    data = transform_all_day(df_pre)
    return data



In [None]:
folderpath = "D:\\Code_Space\\study_de\\Dataset\\Dataset\\log_content"

host = 'localhost'
port = '3306'
db_name = 'Data_Warehouse'
table_name = 'Customer360'
user = 'root'
password = ''
url = 'jdbc:mysql://' + host + ':' + port + '/' + db_name
driver = "com.mysql.cj.jdbc.Driver"


df = main_task()
df.repartition(1).write.csv(f'D:\\Code_Space\\study_de\\Project\\Customer360\\Sample_output_data', header = True)
import_mysql (df, user, password, db_name, table_name, batch_size=500)
