In [1]:
# Import app configuration
from typing import Dict
from config import APP_CONFIG

spark_config_dict: Dict[str, str] = APP_CONFIG.get('spark', dict())
input_config_dict: Dict[str, str] = APP_CONFIG.get('input', dict())
output_config_dict: Dict[str, str] = APP_CONFIG.get('output', dict())

SPARK_APP_NAME = spark_config_dict.get('name', 'spark-app')

In [2]:
import os

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.dataframe import DataFrame

os.environ['SPARK_MEM'] = spark_config_dict.get('memory', '24g')

spark_conf = SparkConf()
spark_conf.set('spark.driver.memory', spark_config_dict.get('driver.memory', '4g'))
spark_conf.set('spark.executor.memory', spark_config_dict.get('executor.memory', '5g'))
spark_conf.set('spark.executor.cores', spark_config_dict.get('executor.cores', '3'))
spark_conf.set('spark.executor.instances', spark_config_dict.get('executor.instances', '4'))
spark_conf.set('spark.dynamicAllocation.enabled', spark_config_dict.get('dynamicAllocation.enabled', 'false'))

# Configure and start new Spark Session
spark_session = (SparkSession.builder
                .appName(name=SPARK_APP_NAME)
                .master(master=spark_config_dict.get('master', 'local'))
                .config(conf=spark_conf)
                .getOrCreate())

spark_session.sparkContext.setLogLevel(spark_config_dict.get('logLevel', 'WARN'))

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/01 18:03:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Read input dataframes
ec_metadata_df: DataFrame = spark_session.read.parquet(input_config_dict.get('path.metadataRequest'), header=True)
input_to_analyze_df: DataFrame = spark_session.read.csv(input_config_dict.get('path.analyzeFile'), header=True)
sequence_category_mapping_df: DataFrame = spark_session.read.csv('resources/sequence_category_mapping.csv', header=True)

In [4]:
input_to_analyze_df = input_to_analyze_df.filter(input_to_analyze_df.iun.isNull()).sort(input_to_analyze_df.codiceraccomandata).withColumn('codiceraccomandata', input_to_analyze_df.codiceraccomandata.cast('string'))
input_to_analyze_df.show(truncate=False)

+------------------+----+
|codiceraccomandata|iun |
+------------------+----+
|3.81527e+11       |NULL|
|3.81532e+11       |NULL|
|381500000296      |NULL|
|381500000321      |NULL|
|381500000376      |NULL|
|381500000560      |NULL|
|381500000867      |NULL|
|381500000889      |NULL|
|381500000935      |NULL|
|381500003495      |NULL|
|381500003701      |NULL|
|381500010562      |NULL|
|381500011244      |NULL|
|381500011346      |NULL|
|381500011916      |NULL|
|381500012406      |NULL|
|381500013246      |NULL|
|381500013623      |NULL|
|381500014112      |NULL|
|381500020496      |NULL|
+------------------+----+


In [5]:
# Deduplicate requests
deduplicates_df: DataFrame = ec_metadata_df \
    .withColumn('version', ec_metadata_df.version.cast('integer')) \
    .filter(ec_metadata_df.paperMeta_productType.isNotNull()) \
    .groupby(ec_metadata_df.requestId) \
    .agg(max('version').alias('version')) \
    .sort(desc('version'))

deduplicates_df = deduplicates_df.withColumnRenamed('requestId', 'maxRequestId')
deduplicates_df = deduplicates_df.withColumnRenamed('version', 'maxVersion')

In [6]:
# Postalizzazione basic - TODO: enhance selection pipeline

deduplicates_join_condition = [
    ec_metadata_df.requestId == deduplicates_df.maxRequestId,
    ec_metadata_df.version == deduplicates_df.maxVersion
]

all_paper_metadata_df: DataFrame = ec_metadata_df \
    .join(other=deduplicates_df, on=deduplicates_join_condition) \
    .selectExpr(
        'if (length(requestTimestamp) = 17, concat(substr(requestTimestamp, 0, 16), ":00Z"), requestTimestamp) as requestTimestamp',
        'paperMeta_productType',
        'array_join(transform(event_list, e -> e.paperProg_statusCode), " ") as statusesString',
        'array_join(transform(filter(event_list, e -> e.paperProg_statusCode rlike "CON080|CON016|(CON9.*)|(RECRN.*)|(RECAG.*)|(RECRS.*)|(P.*)|(RECRSI.*)|(RECRI.*)"),e -> e.paperProg_statusCode), " ") as businessStatusesString',
        'array_join(array_distinct(transform(event_list,e -> e.paperProg_deliveryFailureCause)), " ") as deliveryFailureCause',
        'array_join(array_distinct(flatten(transform(filter(event_list, e -> e.paperProg_statusCode rlike "(REC.*B)|(REC.*E)").paperProg_attachments,e -> e.documentType))), " ") as attachments',
        'array_join(array_distinct(transform(filter(event_list, e -> e.paperProg_statusCode rlike "REC.*" AND NOT e.paperProg_statusCode in ("RECAG012","REC090")),e -> e.paperProg_registeredLetterCode)), " ") as registeredLetterCode',
        'requestId',
        'regexp_extract(requestId, ".*IUN_(.*)\\.RECINDEX.*", 1) as paperIun',
        'regexp_extract(requestId, "pn-cons-000~(.*)\\.PCRETRY_.", 1) as paperRequestId',
        'version as paperVersion'
    )

# Replace empty string coming from array_join() with empty arrays
all_paper_metadata_df = all_paper_metadata_df.na.replace('', None)

In [7]:
iun_from_letter_code_df: DataFrame = input_to_analyze_df.join(all_paper_metadata_df, on=input_to_analyze_df.codiceraccomandata == all_paper_metadata_df.registeredLetterCode, how='left')

# iun_from_letter_code_df.show(n=500, truncate=False)

In [8]:
# Sequence column contains the status history with all REC like statuses when contain any of it, otherwise the entire history
sequence: Column = regexp_replace(
    trim(
        when(
            iun_from_letter_code_df.businessStatusesString.contains('REC'), 
            regexp_replace(iun_from_letter_code_df.businessStatusesString, 'P000|CON[a-zA-Z0-9]*', '')
        ).otherwise(iun_from_letter_code_df.businessStatusesString)
    ), 
    ' +', 
    '->'
)

iun_from_letter_code_with_sequence_df = iun_from_letter_code_df \
    .select(
        iun_from_letter_code_df.codiceraccomandata.alias('originalRegisteredLetterCode'),
        iun_from_letter_code_df.paperIun,
        iun_from_letter_code_df.registeredLetterCode,
        iun_from_letter_code_df.paperMeta_productType.alias('paperMetaProductType'),
        iun_from_letter_code_df.attachments,
        iun_from_letter_code_df.requestId,
        sequence.alias('sequence')
    )

# iun_from_letter_code_with_sequence_df.show(truncate=False)

In [9]:
group_by_sequences_df: DataFrame = iun_from_letter_code_with_sequence_df \
    .groupby(iun_from_letter_code_with_sequence_df.sequence, iun_from_letter_code_with_sequence_df.attachments) \
    .agg(
        count(iun_from_letter_code_with_sequence_df.paperIun).alias('requestCount'), 
        collect_set(to_json(struct(
            iun_from_letter_code_with_sequence_df.requestId, 
            iun_from_letter_code_with_sequence_df.registeredLetterCode,
        ))).cast('string').alias('requestInformationSet'))

In [10]:
labeled_join_condition = [
    group_by_sequences_df.sequence.eqNullSafe(sequence_category_mapping_df.mappedSequence), 
    group_by_sequences_df.attachments.eqNullSafe(sequence_category_mapping_df.mappedAttachments)
]

group_by_sequence_labeled_df: DataFrame = group_by_sequences_df \
    .join(sequence_category_mapping_df, on=labeled_join_condition, how='left') \
    .select(
        group_by_sequences_df.sequence,
        group_by_sequences_df.attachments,
        # group_by_sequences_df.requestInformationSet,
        sequence_category_mapping_df.category,
        sequence_category_mapping_df.action,
        group_by_sequences_df.requestCount
    ).sort(desc(group_by_sequences_df.requestCount))

In [11]:
from utils.custom_data_frame_writer import CustomDataFrameWriter

# Write out dataframe
CustomDataFrameWriter.write(
    df=group_by_sequence_labeled_df.repartition(1),
    output_name=SPARK_APP_NAME,
    output_folder=output_config_dict.get('path'),
    output_format=output_config_dict.get('format')
)

                                                                                