In [16]:
from py4j.java_gateway import java_import
from pyspark.sql import SparkSession
from urllib.parse import urlparse
from pyspark import SparkContext
from pyspark.sql.functions import col, struct, upper, to_date, from_unixtime, count, sum, lit, collect_list, when, first, udf
from pyspark.sql.types import IntegerType, LongType, FloatType, DoubleType, StringType, ArrayType


def get_files(path, size=-1):
    sc = spark.sparkContext
    java_path = sc._jvm.java.net.URI.create(path)
    hadoop_path = sc._jvm.org.apache.hadoop.fs.Path(path)
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(java_path, sc._jsc.hadoopConfiguration())
    file_statuses = fs.listStatus(sc._jvm.org.apache.hadoop.fs.Path(path))
    
    # Convert Java array to Python list and sort by modification time
    sorted_files = sorted(file_statuses, key=lambda x: x.getModificationTime())
    
    # Using Python list operations to mimic Scala foldLeft
    result_list = []
    remaining_capacity = size
    
    for file_status in sorted_files:
        if remaining_capacity - file_status.getLen() > 0 or size == -1:
            if not file_status.getPath().getName().endswith(".tmp"):
                result_list.append(file_status.getPath().toString())
                remaining_capacity -= file_status.getLen()
        else:
            break
    
    return result_list

def move_files_to_processed(files):
    sc = spark.sparkContext
    path = "s3a://intent-raw"
    java_path = sc._jvm.java.net.URI.create(path)
    hadoop_path = sc._jvm.org.apache.hadoop.fs.Path(path)
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(java_path, sc._jsc.hadoopConfiguration())
    
    for file in files:
        path = sc._jvm.org.apache.hadoop.fs.Path(file)
        if fs.delete(path, False):
            print(f"Deleted {file}")
        else:
            print(f"Failed to delete {file}")

# Define UDFs for different data types
ns_int = udf(lambda v: None if v == 0 else v, IntegerType())
ns_long = udf(lambda v: None if v == 0 else v, LongType())
ns_float = udf(lambda v: None if v == 0 else v, FloatType())
ns_double = udf(lambda v: None if v == 0 else v, DoubleType())
ns_string = udf(lambda v: None if v == "" else v, StringType())

# UDF for calculating session lengths
def session_lengths(arr, dist):
    if arr is None:
        return []
    
    arr_sorted = sorted(arr)
    groups = []
    current_group = [arr_sorted[0]]
    
    for elem in arr_sorted[1:]:
        if elem - current_group[0] < dist:
            current_group.append(elem)
        else:
            groups.append(current_group)
            current_group = [elem]
    
    groups.append(current_group)
    
    return [max(group) - min(group) for group in groups]

# UDF for calculating engaged sessions
def engaged_sessions(arr, minimum):
    if arr is None:
        return 0
    
    return len([session for session in arr if session >= minimum])

# UDF for calculating bounced sessions
def bounced_sessions(arr, minimum):
    if arr is None:
        return 0
    
    return len([session for session in arr if session < minimum])

# UDF for calculating total time on site
def time_on_site(arr):
    if arr is None:
        return 0
    
    return sum(arr)

# Register UDFs with Spark
session_lengths_udf = udf(session_lengths, ArrayType(LongType()))
engaged_sessions_udf = udf(engaged_sessions, LongType())
bounced_sessions_udf = udf(bounced_sessions, LongType())
time_on_site_udf = udf(time_on_site, LongType())

In [None]:
for prefix in ["auction", "lose", "win", "impression", "click"]:
    print(prefix)
    rtbEntity = get_files(f"s3a://intent-raw/new_rtb_{prefix}/", 50000000000)
    entity = spark.read. \
    json(rtbEntity). \
    select(
      col("id"),
      struct(
        ns_string(col("publisher.id")).alias("id"),
        ns_string(col("publisher.category")).alias("category"),
        ns_string(col("publisher.site")).alias("site"),
        ns_string(col("publisher.language")).alias("language"),
        ns_string(col("publisher.app")).alias("app"),
        ns_string(upper(col("publisher.country"))).alias("country"),
      ).alias("publisher"),
    
      struct(
        ns_string(col("user.telco_id")).alias("telco_id"),
        ns_string(col("user.intent_id")).alias("intent_id"),
        ns_string(col("user.ad_exchange_user_id")).alias("ad_exchange_user_id"),
        col("profile")
      ).alias("user"),
    
      struct(
        ns_string(col("device.ifa")).alias("ifa"),
        ns_string(col("device.make")).alias("make"),
        ns_string(col("device.model")).alias("model"),
        ns_string(col("device.os_name")).alias("os_name"),
        ns_string(col("device.os_version")).alias("os_version"),
        ns_string(col("device.connection_type")).alias("connection_type"),
        ns_string(col("device.carrier")).alias("carrier"),
        ns_string(col("device.language")).alias("language"),
      ).alias("device"),
    
      struct(
        ns_string(col("geo.country")).alias("country"),
        ns_string(col("geo.region")).alias("region"),
        ns_string(col("geo.city")).alias("city"),
        ns_long(col("geo.utc_offset")).alias("utc_offset")
      ).alias("geo"),
    
      struct(
        ns_string(col("advertiser.id")).alias("id"),
        ns_string(col("advertiser.name")).alias("name"),
        ns_string(col("advertiser.currency")).alias("currency"),
        ns_double(col("advertiser.spend")).alias("spend"),
        ns_double(col("advertiser.spend_usd")).alias("spend_usd"),
      ).alias("advertiser"),
    
      struct(
        ns_string(col("campaign.id")).alias("id"),
        ns_string(col("campaign.name")).alias("name"),
        col("campaign.categories"),
        ns_string(col("campaign.optimization_type")).alias("optimization_type"),
        ns_double(col("campaign.user_cpx")).alias("user_cpx"),
        ns_double(col("campaign.cpx")).alias("cpx"),
      ).alias("campaign"),
    
      struct(
        ns_string(col("creative.id")).alias("id"),
        ns_string(col("creative.format")).alias("format"),
        ns_long(col("creative.height")).alias("height"),
        ns_long(col("creative.width")).alias("width")
      ).alias("creative"),
    
      struct(
        col("placement.postition").alias("position"),
        col("placement.is_interstitial")).alias("is_interstitial"),
        col("placement.is_rewarded")).alias("is_rewarded"),
        col("placement.video_minduration")).alias("video_minduration"),
        col("placement.video_maxduration")).alias("video_maxduration"),
        col("placement.video_startdelay")).alias("video_startdelay"),
        col("placement.video_placement")).alias("video_placement"),
        ns_long(col("placement.height")).alias("height"),
        ns_long(col("placement.width")).alias("width"),
      ).alias("placement"),
    
      struct(
        ns_string(col("model.notelco_model_name")).alias("notelco_model_name"),
        ns_string(col("model.notelco_model_version")).alias("notelco_model_version"),
        ns_string(col("model.notelco_run_id")).alias("notelco_run_id"),
        ns_double(col("model.notelco_model_probability_cc")).alias("notelco_model_probability_cc"),
        ns_double(col("model.notelco_model_probability_sc")).alias("notelco_model_probability_sc"),
        ns_double(col("model.notelco_model_probability_final")).alias("notelco_model_probability_final"),
        # ns_string(col("model.telco_model_name")).alias("telco_model_name"),
        # ns_string(col("model.telco_model_version")).alias("telco_model_version"),
        # ns_string(col("model.telco_run_id")).alias("telco_run_id"),
        ns_string(col("model.win_model_name")).alias("win_model_name"),
        ns_string(col("model.win_model_version")).alias("win_model_version"),
        ns_string(col("model.win_run_id")).alias("win_run_id"),
        ns_double(col("model.win_model_probability")).alias("win_model_probability"),
        ns_double(col("model.win_model_lambda")).alias("win_model_lambda")
      ).alias("model"),
    
      struct(
        ns_string(col("bid.request_id")).alias("request_id"),
        col("bid.bid_index")).alias("bid_index"),
        ns_double(col("bid.bid_price")).alias("bid_price"),
        ns_double(col("bid.win_price")).alias("win_price"),
        ns_double(col("bid.second_price")).alias("second_price"),
        ns_double(col("bid.spend")).alias("spend"),
        ns_double(col("bid.bid_floor")).alias("bid_floor"),
        ns_string(col("bid.exchange_id")).alias("exchange_id"),
        col("bid.time"),
        col("bid.loss_reason_code"),
        ns_double(col("bid.currency_rate")).alias("currency_rate"),
        ns_string(col("bid.displaymanager")).alias("displaymanager"),
      ).alias("bid"),
    
      col("time")).alias("time")
    ). \
    withColumn("date", to_date(from_unixtime(col("time"))).cast("string"))
    
    entity. \
    repartition(12). \
    write.format("delta")). \
    partitionBy("date")).mode("Append")).option("mergeSchema", True). \
    save(f"s3a://intent-parquet/rtb_{prefix}")
    
    move_files_to_processed(rtbEntity)

rtbEventFiles = get_files(f"s3a://intent-raw/new_rtb_event/")

spark.read. \
json(rtbEventFiles). \
withColumn("date", to_date(from_unixtime(col("timestamp")))). \
repartition(12). \
write.format("delta")). \
partitionBy("date")).mode("Append")).option("mergeSchema", True). \
save(f"s3a://intent-parquet/rtb_event")

move_files_to_processed(rtbEventFiles)