In [13]:
import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import when,lit,col 

In [2]:
sys.path.append("/opt/workspace")
from load_env import env_parameter

# Load from .env
env_vars = env_parameter()
spark_master = env_vars["spark_master"]
event_log_dir = env_vars["event_log_dir"]
minio_user = env_vars["minio_user"]
minio_password = env_vars["minio_pass"]

In [21]:
event_log_dir

'file:/tmp'

In [4]:
# Start Spark session with PostgreSQL JDBC driver
spark = SparkSession.builder \
    .appName("SAP_BRONZE") \
    .master(spark_master) \
    .config("spark.jars.packages", ",".join([
        "org.apache.hadoop:hadoop-aws:3.3.2"
    ])) \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", minio_user) \
    .config("spark.hadoop.fs.s3a.secret.key", minio_password) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", event_log_dir) \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6de04db1-baf4-4e04-86af-f53302ec826d;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.1026 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 253ms :: artifacts dl 7ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.1026 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.2 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------

In [5]:
df_landing = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("s3a://landing/SAP/adrc.csv")

25/09/28 08:41:06 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [6]:
df_landing.show(1,truncate=True,vertical=True)

25/09/28 08:41:15 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


-RECORD 0-------------------------------
 client           | 200                 
 addrnumber       | 15211               
 date_from        | 2001-01-01          
 nation           | NULL                
 date_to          | 9999-12-31          
 title            | NULL                
 name1            | Ada Burchette       
 name2            | NULL                
 name3            | NULL                
 name4            | NULL                
 name_text        | NULL                
 name_co          | NULL                
 city1            | New York            
 city2            | NULL                
 city_code        | NULL                
 cityp_code       | NULL                
 home_city        | NULL                
 cityh_code       | NULL                
 chckstatus       | NULL                
 regiogroup       | NULL                
 post_code1       | 10001               
 post_code2       | NULL                
 post_code3       | NULL                
 pcode1_ext     

# datetime issue 
- according to the legay system some date time columns need to fix at bronze layer
- date_from
- date_to 

In [7]:
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")

In [8]:
# df_landing.write \
#     .mode("overwrite") \
#     .parquet("s3a://bronze/sap/adrc/")

In [9]:
df_read_info = spark.read.parquet("s3a://bronze/sap/adrc/")

In [10]:
df_read_info.show(1,truncate=False, vertical=True)

-RECORD 0------------------------------------
 client           | 800                      
 addrnumber       | 38600                    
 date_from        | 0001-01-01               
 nation           | NULL                     
 date_to          | 9999-12-31               
 title            | NULL                     
 name1            | MODE Technologies        
 name2            | 26                       
 name3            | NULL                     
 name4            | NULL                     
 name_text        | NULL                     
 name_co          | NULL                     
 city1            | Berlin                   
 city2            | NULL                     
 city_code        | NULL                     
 cityp_code       | NULL                     
 home_city        | NULL                     
 cityh_code       | NULL                     
 chckstatus       | NULL                     
 regiogroup       | NULL                     
 post_code1       | 10825         

In [19]:
df_modify = df_read_info.withColumn(
    "date_from",
    when(col("date_from") == "0001-01-01", lit("1900-01-01")).otherwise(col("date_from"))
).withColumn(
    "date_to",
    when(col("date_to") == "9999-01-01", lit("9999-12-31")).otherwise(col("date_to"))
)

In [20]:
df_modify.show(1,True,True)

-RECORD 0--------------------------------
 client           | 800                  
 addrnumber       | 38600                
 date_from        | 1900-01-01           
 nation           | NULL                 
 date_to          | 9999-12-31           
 title            | NULL                 
 name1            | MODE Technologies    
 name2            | 26                   
 name3            | NULL                 
 name4            | NULL                 
 name_text        | NULL                 
 name_co          | NULL                 
 city1            | Berlin               
 city2            | NULL                 
 city_code        | NULL                 
 cityp_code       | NULL                 
 home_city        | NULL                 
 cityh_code       | NULL                 
 chckstatus       | NULL                 
 regiogroup       | NULL                 
 post_code1       | 10825                
 post_code2       | NULL                 
 post_code3       | NULL          