In [1]:
# ### Libraries and session

In [2]:
env

{'SPARK_TGZ_ASC_URL': 'https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz.asc',
 'SPARK_VERSION_SHORT': '3.4',
 'HOSTNAME': 'spark-iceberg',
 'LANGUAGE': 'en_US:en',
 'AWS_SDK_BUNDLE_VERSION': '2.20.18',
 'S3_PATH_STYLE_ACCESS': 'true',
 'ICEBERG_VERSION': '1.3.1',
 'JAVA_HOME': '/opt/java/openjdk',
 'SPARK_TGZ_URL': 'https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz',
 'AWS_DEFAULT_REGION': 'us-east-1',
 'AWS_REGION': 'us-east-1',
 'PWD': '/opt/spark',
 'S3_ENDPOINT': 'http://minio:9000',
 'HOME': '/root',
 'LANG': 'en_US.UTF-8',
 'GPG_KEY': '<hidden>',
 'HADOOP_AWS_VERSION': '3.3.4',
 'AWS_SECRET_ACCESS_KEY': '<hidden>',
 'AWS_SDK_VERSION': '1.12.262',
 'PYTHONPATH': '/opt/spark/python:/opt/spark/python/lib/py4j-0.10.9.7-src.zip:',
 'TERM': 'xterm-color',
 'SPARK_VERSION': '3.4.2',
 'SHLVL': '0',
 'AWS_ACCESS_KEY_ID': '<hidden>',
 'SPARK_HOME': '/opt/spark',
 'ZSH_THEME': 'robbyrussell',
 'LC_ALL': 'en_US.UTF-8',
 'PATH': '/opt/s

In [2]:
import pyspark
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DateType, DecimalType
from pyspark.sql.functions import year, to_date, month, dayofmonth,  from_unixtime, unix_timestamp
from pyspark.sql.functions import current_timestamp

In [3]:
REC_CREATED_COLUMN_NAME = "rec_created"
REC_UPDATED_COLUMN_NAME = "rec_updated"

In [4]:
# ip and environments
environment = 'prd'

# Source
system_source = "pscore"
system_table ="pscore_sgw"

# Set the bucket and folder paths
source_bucket = 'landing-zone'
source_folder = f'files/{system_source}/ascll'

lakehouse_bucket = 'lakehouse'
lakehouse_folder = 'iceberg'

# table destination settings
dest_db_catalog = 'iceberg'
dest_db_schema = 'bronze'
dest_db_table = system_table
dest_final_db = f'{dest_db_catalog}.{dest_db_schema}'
dest_final_table = f'{dest_final_db}.{dest_db_table}'

# Spark identification and settings
appname = f'BRONZE_{dest_final_db}.{dest_final_table}'
log_level = 'WARN' # Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN

# Set your MinIO credentials
s3_endpoint = 'http://minio:9000'
s3_access_key = 'minio'
s3_secret_key = 'minio123'

In [5]:
spark = SparkSession.builder\
    .appName(appname)\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/21 14:30:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
print("=================================================")
spark.sparkContext.setLogLevel(log_level)
print(pyspark.SparkConf().getAll())

[('spark.hadoop.hive.cli.print.header', 'true'), ('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false'), ('spark.hadoop.fs.s3a.path.style.access', 'true'), ('spark.app.name', 'BRONZE_iceberg.bronze.iceberg.bronze.pscore_sgw'), ('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog'), ('spark.hadoop.fs.s3.endpoint', 'http://minio:9000'), ('spark.hadoop.fs.s3.access.key', 'minio'), ('spark.hadoop.fs.s3.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem'), ('spark.sql.catalog.iceberg.uri', 'thrift://hive-metastore:9083'), ('spark.sql.catalog.iceberg.s3.endpoint', 'http://minio:9000'), ('spark.sql.catalog.iceberg.s3.path-style-access', 'true'), ('spark.hadoop.fs.s3.path.style.access', 'true'), ('spark.hive.metastore.uris', 'thrift://hive-metastore:9083'), ('spark.master', 'local[*]'), ('spark.submit.deployMode', 'client'), ('spark.app.submitTime', '1711031452592'), ('spark.hadoop.fs.s3a.access.key', 'minio'), ('spark.sql.catalog.iceberg.io-impl', 'org.apache.iceberg

In [7]:
def audit_add_column(df):
    """
    Add audit columns with current date to the DataFrame.
    
    Args:
        df (DataFrame): The DataFrame to which the audit columns are added.
    
    Returns:
        DataFrame: DataFrame with the audit columns added.
    """
    created_column_name = REC_CREATED_COLUMN_NAME
    updated_column_name = REC_UPDATED_COLUMN_NAME
    # Add audit column with current date
    df_with_audit = df.withColumn(created_column_name, current_timestamp()) \
                      .withColumn(updated_column_name, current_timestamp())
    
    return df_with_audit

In [9]:
s3 = boto3.client('s3', endpoint_url=s3_endpoint, aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key)

# List all files in the source directory
file_list = []
paginator = s3.get_paginator('list_objects_v2')


for result in paginator.paginate(Bucket=f"{environment}-{source_bucket}", Prefix=source_folder):
    
    if 'Contents' in result:
        for item in result['Contents']:
            file_list.append(item['Key'])

print(file_list)

['files/pscore/ascll/DATA_TRAFFIC_85994_99022_20240312201006.cdr', 'files/pscore/ascll/DATA_TRAFFIC_86085_32451_20240313113000.cdr']


In [10]:
df_source_schema = StructType([
                        StructField("record_type", StringType()),
                        StructField("network_initiated_pdp_context", StringType()),
                        StructField("imsi", StringType()),
                        StructField("msisdn", StringType()),
                        StructField("imei", StringType()),
                        StructField("charging_id", StringType()),
                        StructField("ggsn_pgw_address", StringType()),
                        StructField("sgsn_sgw_address", StringType()),
                        StructField("ms_nw_capability", StringType()),
                        StructField("pdp_pdn_type", StringType()),
                        StructField("served_pdp_address", StringType()),
                        StructField("dynamic_address_flag", StringType()),
                        StructField("access_point_name_ni", StringType()),
                        StructField("record_sequence_number", StringType()),
                        StructField("record_sequence_number_meg", StringType()),
                        StructField("node_id", StringType()),
                        StructField("local_sequence_number", StringType()),
                        StructField("charging_characteristics", StringType()),
                        StructField("record_opening_time", StringType()),
                        StructField("duration", StringType()),
                        StructField("rat_type", StringType()),
                        StructField("cause_for_record_closing", StringType()),
                        StructField("diagnostic", StringType()),
                        StructField("volume_uplink", StringType()),
                        StructField("volume_downlink", StringType()),
                        StructField("total_volume", StringType()),
                        StructField("lac_or_tac", StringType()),
                        StructField("ci_or_eci", StringType()),
                        StructField("rac", StringType()),
                        StructField("rnc_unsent_data_volume", StringType()),
                        StructField("req_alloc_ret_priority", StringType()),
                        StructField("neg_alloc_ret_priority", StringType()),
                        StructField("req_traffic_class", StringType()),
                        StructField("neg_traffic_class", StringType()),
                        StructField("qci", StringType()),
                        StructField("req_max_bitrate_uplink", StringType()),
                        StructField("req_max_bitrate_downlink", StringType()),
                        StructField("req_guar_bitrate_uplink", StringType()),
                        StructField("req_guar_bitrate_downlink", StringType()),
                        StructField("neg_max_bitrate_uplink", StringType()),
                        StructField("neg_max_bitrate_downlink", StringType()),
                        StructField("neg_guar_bitrate_uplink", StringType()),
                        StructField("neg_guar_bitrate_downlink", StringType()),
                        StructField("mccmnc", StringType()),
                        StructField("country_name", StringType()),
                        StructField("input_filename", StringType()),
                        StructField("output_filename", StringType()),
                        StructField("event_date", TimestampType())                    
                ])

In [11]:
num_columns_contract = len(df_source_schema.fields)
print("Number of columns of contract:", num_columns_contract)


df_source_data = spark.createDataFrame([], schema=df_source_schema)

Number of columns of contract: 48


In [12]:
# reading files in the source
for file_name in file_list:

    print(f'File in processing: {file_name}')
    
    df = spark.read.format("csv") \
                    .option("header", "false") \
                    .option("delimiter", ";") \
                    .schema(df_source_schema) \
                    .load(f"s3://{environment}-{source_bucket}/{file_name}")

    if len(df.columns) == num_columns_contract:
        print('No of columns matched')
        df_source_data = df_source_data.union(df)

df_send_to_bronze = audit_add_column(df_source_data)

File in processing: files/pscore/ascll/DATA_TRAFFIC_85994_99022_20240312201006.cdr


24/03/21 14:31:00 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


No of columns matched
File in processing: files/pscore/ascll/DATA_TRAFFIC_86085_32451_20240313113000.cdr
No of columns matched


In [13]:
sql_db_create = f"""
CREATE DATABASE IF NOT EXISTS {dest_final_db} COMMENT '' LOCATION 's3://{environment}-{lakehouse_bucket}/{dest_db_catalog}/{dest_db_schema}/'
"""
print(sql_db_create)
spark.sql(sql_db_create)


CREATE DATABASE IF NOT EXISTS iceberg.bronze COMMENT '' LOCATION 's3://prd-lakehouse/iceberg/bronze/'



DataFrame[]

In [14]:
sql_ddl_drop_table = f"""
    DROP TABLE IF EXISTS {dest_final_table}
"""

In [15]:
show_databases_df = spark.sql("SHOW CATALOGS")
show_databases_df.show()

+-------------+
|      catalog|
+-------------+
|      iceberg|
|spark_catalog|
+-------------+



In [16]:
show_databases_df = spark.sql("SHOW DATABASES")
show_databases_df.show()

+---------+
|namespace|
+---------+
|   bronze|
|  default|
+---------+



In [17]:
show_tables_df = spark.sql("SHOW TABLES in bronze")
show_tables_df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|   bronze|pscore_sgw|      false|
+---------+----------+-----------+



                                                                                

In [18]:
sql_ddl_create_table = f"""
        create table if not exists {dest_final_table}
        (
            record_type STRING,
            network_initiated_pdp_context STRING,
            imsi STRING,
            msisdn STRING,
            imei STRING,
            charging_id STRING,
            ggsn_pgw_address STRING,
            sgsn_sgw_address STRING,
            ms_nw_capability STRING,
            pdp_pdn_type STRING,
            served_pdp_address STRING,
            dynamic_address_flag STRING,
            access_point_name_ni STRING,
            record_sequence_number STRING,
            record_sequence_number_meg STRING,
            node_id STRING,
            local_sequence_number STRING,
            charging_characteristics STRING,
            record_opening_time STRING,
            duration STRING,
            rat_type STRING,
            cause_for_record_closing STRING,
            diagnostic STRING,
            volume_uplink STRING,
            volume_downlink STRING,
            total_volume STRING,
            lac_or_tac STRING,
            ci_or_eci STRING,
            rac STRING,
            rnc_unsent_data_volume STRING,
            req_alloc_ret_priority STRING,
            neg_alloc_ret_priority STRING,
            req_traffic_class STRING,
            neg_traffic_class STRING,
            qci STRING,
            req_max_bitrate_uplink STRING,
            req_max_bitrate_downlink STRING,
            req_guar_bitrate_uplink STRING,
            req_guar_bitrate_downlink STRING,
            neg_max_bitrate_uplink STRING,
            neg_max_bitrate_downlink STRING,
            neg_guar_bitrate_uplink STRING,
            neg_guar_bitrate_downlink STRING,
            mccmnc STRING,
            country_name STRING,
            input_filename STRING,
            output_filename STRING,
            event_date Timestamp,
            rec_created timestamp,
            rec_updated timestamp
        ) 
        using iceberg
        """

In [20]:
import boto3

# Specify MinIO credentials
minio_access_key = 'minio'
minio_secret_key = 'minio123'
minio_endpoint = 'http://minio:9000'  # e.g., http://minio.example.com:9000

# Initialize the MinIO client
minio_client = boto3.client('s3',
                            endpoint_url=minio_endpoint,
                            aws_access_key_id=minio_access_key,
                            aws_secret_access_key=minio_secret_key,
                            region_name='us-east-1',  # Specify a region (does not matter for MinIO)
                            verify=False)  # Set to False if you're using self-signed SSL certificates

# List buckets
response = minio_client.list_buckets()

# Print bucket names
print("Buckets:")
for bucket in response['Buckets']:
    print(bucket['Name'])

Buckets:
dev-lakehouse
dev-landing-zone
prd-lakehouse
prd-landing-zone


In [21]:
# #### SQL DDL Execution
## drop table
spark.sql(sql_ddl_drop_table)

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


DataFrame[]

In [22]:
# #### SQL DDL Execution
## create table
spark.sql(sql_ddl_create_table)

24/03/21 14:31:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[]

In [23]:
# Simple select
df_send_to_bronze.select('imsi').show()

# ### Write table
# wrintint the data on lakehouse
# wrinte the data on lakehouse
df_send_to_bronze.writeTo(f'{dest_final_table}').append()

                                                                                

+---------------+
|           imsi|
+---------------+
|226050085342734|
|310280073297191|
|222500015506415|
|222013105410847|
|222018503951163|
|204046209804910|
|440103068027552|
|234304199485641|
|234304143588372|
|240020710114643|
|240076511401701|
|208090082723695|
|214015522124743|
|208090063587228|
|208090063974865|
|234159504296546|
|255065004702895|
|260021121304281|
|246020102946580|
|272034010341309|
+---------------+
only showing top 20 rows



                                                                                

In [24]:
table = spark.table(f'{dest_final_table}')
print(table.printSchema())

root
 |-- record_type: string (nullable = true)
 |-- network_initiated_pdp_context: string (nullable = true)
 |-- imsi: string (nullable = true)
 |-- msisdn: string (nullable = true)
 |-- imei: string (nullable = true)
 |-- charging_id: string (nullable = true)
 |-- ggsn_pgw_address: string (nullable = true)
 |-- sgsn_sgw_address: string (nullable = true)
 |-- ms_nw_capability: string (nullable = true)
 |-- pdp_pdn_type: string (nullable = true)
 |-- served_pdp_address: string (nullable = true)
 |-- dynamic_address_flag: string (nullable = true)
 |-- access_point_name_ni: string (nullable = true)
 |-- record_sequence_number: string (nullable = true)
 |-- record_sequence_number_meg: string (nullable = true)
 |-- node_id: string (nullable = true)
 |-- local_sequence_number: string (nullable = true)
 |-- charging_characteristics: string (nullable = true)
 |-- record_opening_time: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- rat_type: string (nullable = true)
 |-- 

In [25]:
print(f"No of Records: {table.count()}")

No of Records: 131473
