In [1]:
from pyspark.shell import spark

table_config = {
    "name": "marketing_campaigns_raw",
    "csv_file": "bronze_marketing_campaigns_raw.csv",
    "database": "bronze"
}

25/10/24 10:43:00 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.3
      /_/

Using Python version 3.11.6 (main, Oct  3 2023 11:57:02)
Spark context Web UI available at http://cfc35822a456:4040
Spark context available as 'sc' (master = spark://spark-master:7077, app id = app-20251024104258-0007).
SparkSession available as 'spark'.


In [2]:
table_name = table_config["name"]
csv_file = table_config["csv_file"]
database = table_config["database"]
full_table_name = f"{database}.{table_name}"
print(csv_file)
print(full_table_name)

bronze_marketing_campaigns_raw.csv
bronze.marketing_campaigns_raw


In [3]:
input_path = f"s3a://data/source_data/{csv_file}"
df = spark.read.csv(path=input_path,
                    sep='|',
                    header=True,
                    inferSchema=False, quote='"')

                                                                                

In [4]:
df.show(truncate=False)

+-----------+-----------------------------+-------------+----------+----------+----------+--------+---------------------------------------------------------------------------------------+-----------+------------------+--------------------+-----------------------+--------------+
|campaign_id|campaign_name                |campaign_type|channel   |start_date|end_date  |budget  |target_audience                                                                        |creative_id|_source_system    |_ingestion_timestamp|_file_name             |_record_offset|
+-----------+-----------------------------+-------------+----------+----------+----------+--------+---------------------------------------------------------------------------------------+-----------+------------------+--------------------+-----------------------+--------------+
|CAMP001    |Fall Sale 2025               |email        |email     |2025-10-01|2025-10-31|15000.00|{"age_range":"25-45","interests":["fashion","home-decor"],"segme

In [5]:
from pyspark.sql.functions import to_timestamp, col
from pyspark.sql import DataFrame

df: DataFrame = df.withColumn("_ingestion_timestamp_casted",
                              to_timestamp(col("_ingestion_timestamp"), "yyyy-MM-dd HH:mm:ss"))
df: DataFrame = df.withColumn('_record_offset_casted', df['_record_offset'].astype('bigint'))
df = df.drop('_record_offset').withColumnRenamed('_record_offset_casted', '_record_offset') \
    .drop('_ingestion_timestamp').withColumnRenamed('_ingestion_timestamp_casted', '_ingestion_timestamp')

In [6]:
df.printSchema()

root
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)
 |-- campaign_type: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- target_audience: string (nullable = true)
 |-- creative_id: string (nullable = true)
 |-- _source_system: string (nullable = true)
 |-- _file_name: string (nullable = true)
 |-- _ingestion_timestamp: timestamp (nullable = true)
 |-- _record_offset: long (nullable = true)



In [7]:
cols = [col.col_name for col in spark.sql(f"SHOW COLUMNS IN {full_table_name}").select('col_name').collect()]

In [8]:
print(cols)

['campaign_id', 'campaign_name', 'campaign_type', 'channel', 'start_date', 'end_date', 'budget', 'target_audience', 'creative_id', '_source_system', '_file_name', '_record_offset', '_ingestion_timestamp']


In [9]:
df = df.select([df[col] for col in cols])

In [10]:
df.printSchema()

root
 |-- campaign_id: string (nullable = true)
 |-- campaign_name: string (nullable = true)
 |-- campaign_type: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- target_audience: string (nullable = true)
 |-- creative_id: string (nullable = true)
 |-- _source_system: string (nullable = true)
 |-- _file_name: string (nullable = true)
 |-- _record_offset: long (nullable = true)
 |-- _ingestion_timestamp: timestamp (nullable = true)



In [11]:
df.write.insertInto(full_table_name)

25/10/24 10:44:04 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
25/10/24 10:44:04 WARN RetryingMetaStoreClient: MetaStoreClient lost connection. Attempting to reconnect (1 of 1) after 1s. listPartitionsWithAuthInfo
org.apache.thrift.transport.TTransportException: Cannot write to null outputStream
	at org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:142)
	at org.apache.thrift.protocol.TBinaryProtocol.writeI32(TBinaryProtocol.java:185)
	at org.apache.thrift.protocol.TBinaryProtocol.writeMessageBegin(TBinaryProtocol.java:116)
	at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:70)
	at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:62)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.send_get_partitions_ps_with_auth(ThriftHiveMetastore.java:2562)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_

In [12]:
spark.sql(f"SELECT * FROM {full_table_name}").show()

+-----------+--------------------+-------------+----------+----------+----------+--------+--------------------+-----------+------------------+--------------------+--------------+--------------------+
|campaign_id|       campaign_name|campaign_type|   channel|start_date|  end_date|  budget|     target_audience|creative_id|    _source_system|          _file_name|_record_offset|_ingestion_timestamp|
+-----------+--------------------+-------------+----------+----------+----------+--------+--------------------+-----------+------------------+--------------------+--------------+--------------------+
|    CAMP001|      Fall Sale 2025|        email|     email|2025-10-01|2025-10-31|15000.00|{"age_range":"25-...|    CRE1001|marketing_platform|campaigns_2025100...|             0| 2025-10-01 08:00:00|
|    CAMP004|     Fitness October|        email|     email|2025-10-01|2025-10-31| 8000.00|{"age_range":"22-...|    CRE1004|marketing_platform|campaigns_2025100...|             1| 2025-10-01 08:00:00|


In [1]:
tables = [
    'campaign_events_raw',
    'customer_interactions_raw',
    'inventory_snapshots_raw',
    'marketing_campaigns_raw',
    'product_catalog_raw',
    'subscriptions_raw',
    'transaction_items_raw',
    'transactions_raw',
]
df = None
for table in tables:
    loop_df = spark.sql(
    f"""
    select '{table}' as tt,
    count(*) as cnt
    from bronze.{table}
    """)
    if df is None:
        df = loop_df
    else:
        df = df.union(loop_df)
df.show()

25/10/24 10:47:01 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

+--------------------+---+
|                  tt|cnt|
+--------------------+---+
| campaign_events_raw|  0|
|customer_interact...|  0|
|inventory_snapsho...|  0|
|marketing_campaig...|  0|
| product_catalog_raw|  0|
|   subscriptions_raw|  0|
|transaction_items...|  0|
|    transactions_raw|  0|
+--------------------+---+

