In [0]:
# Inline function to reorder columns
def reord_cols(df):
    return df.select("id", "custprofession", "custage", "custlname", "custfname")

# Inline function to enrich data
def enrich(df):
    enrich_addcols_df6 = df.withColumn("curdt", current_date()).withColumn("loadts", current_timestamp())
    enrich_ren_df7 = enrich_addcols_df6.withColumnRenamed("srcsystem", "src")
    enrich_combine_df8 = enrich_ren_df7.withColumn("nameprof", concat("custfname", lit(" is a "), "custprofession")).drop("custfname")
    enrich_combine_split_df9 = enrich_combine_df8.withColumn("custfname", split("nameprof", ' ')[0])
    enrich_combine_split_cast_reformat_df10 = enrich_combine_split_df9.withColumn("custage",regexp_replace("custage","[-]",'')).withColumn("curdtstr", col("curdt").cast("string"))\
        .withColumn("year", year(col("curdt"))).withColumn("curdtstr", concat(substring("curdtstr", 3, 2), lit("/"), substring("curdtstr", 6, 2)))\
        .withColumn("dtfmt", date_format("curdt", 'yyyy/MM/dd hh:mm:ss'))
    return enrich_combine_split_cast_reformat_df10

# Inline function for pre-wrangling
def pre_wrangle(df):
    return df.select("id", "custprofession", "custage", "src", "curdt")\
        .groupBy("custprofession")\
        .agg(avg("custage").alias("avgage"))\
        .where("avgage>49")\
        .orderBy("custprofession")

# Inline function for pre-wrangling analysis
def prewrang_anal(df):
    sample1 = df.sample(0.2, 10)
    smry = df.summary()
    coorval = df.corr("custage", "custage")
    covval = df.cov("custage", "custage")
    freqval = df.freqItems(["custprofession", "agegroup"], 0.4)
    return sample1, smry, coorval, covval, freqval

# Inline function to aggregate data
def aggregate_data(df):
    return df.groupby("year", "agegroup", "custprofession").agg(max("curdt").alias("max_curdt"), min("curdt").alias("min_curdt"),
                                                                 avg("custage").alias("avg_custage"), mean("custage").alias("mean_age"),
                                                                 countDistinct("custage").alias("distinct_cnt_age"))\
             .orderBy("year", "agegroup", "custprofession", ascending=[False, True, False])

# Inline function to standardize columns
def standardize_cols(df):
    srcsys = 'Retail'
    reord_added_df3 = df.withColumn("srcsystem", lit(srcsys))
    reord_added_replaced_df4 = reord_added_df3.withColumn("custfname", col("custlname"))
    chgnumcol_reord_df5 = reord_added_replaced_df4.drop("custlname")
    return chgnumcol_reord_df5

# Function to return a predefined schema
def ret_struct():
    return StructType([StructField("id", IntegerType(), False),
                       StructField("custfname", StringType(), False),
                       StructField("custlname", StringType(), True),
                       StructField("custage", ShortType(), True),
                       StructField("custprofession", StringType(), True)])


In [0]:
def main(arg):
    print("Define Spark session object (inline code)")
    spark = SparkSession.builder\
        .appName("Very Important SQL End to End App")\
        .config("spark.jars", "dbfs:/FileStore/config/mysql_connector_java.jar")\
        .enableHiveSupport()\
        .getOrCreate()

    print("Set the logger level to error")
    spark.sparkContext.setLogLevel("ERROR")

    print("1. Data Munging")
    custstructtype1 = ret_struct()
    custdf_clean = spark.read.csv(arg[1], mode='dropmalformed', schema=custstructtype1)
    custdf_optimized = custdf_clean.repartition(4).cache()

    print("Dropping Duplicates of cust data")
    dedup_dropduplicates_df = custdf_optimized.dropDuplicates(["custage", "id"])
    
    txnsstructtype2 = StructType([StructField("txnid", IntegerType(), False),
                                  StructField("dt", StringType()),
                                  StructField("custid", IntegerType()),
                                  StructField("amt", DoubleType()),
                                  StructField("category", StringType()),
                                  StructField("product", StringType()),
                                  StructField("city", StringType()),
                                  StructField("state", StringType()),
                                  StructField("spendby", StringType())])

    txns = spark.read.csv(arg[2], mode='dropmalformed', schema=txnsstructtype2)
    txns_clean_optimized = txns.repartition(1).cache()
    
    print("Dropping Duplicates of txns data")
    txns_dedup = txns_clean_optimized.dropDuplicates(["dt", "amt", "txnid"])

    print("Data Preparation (Cleansing & Scrubbing)")
    prof_dict = {"Therapist": "Physician", "Musician": "Music Director", "na": "prof not defined"}
    dedup_dropfillreplacena_clensed_scrubbed_df1 = dedup_dropduplicates_df.na.replace(prof_dict, subset=["custprofession"])
    dedup_dropfillreplacena_clensed_scrubbed_df1.show()

    print("Data Standardization (column)")
    reord_df2 = reord_cols(dedup_dropfillreplacena_clensed_scrubbed_df1)
    munged_df = standardize_cols(reord_df2)
    munged_df.show()

    print("Data Enrichment (values)")
    munged_enriched_df = enrich(munged_df)
    munged_enriched_df.show()

    print("Data Customization & Processing (Business logics)")
    from pyspark.sql.functions import udf
    age_custom_validation = udf(lambda x: 'Adult' if x > 18 else 'Child')
    custom_agegrp_munged_enriched_df = munged_enriched_df.withColumn("custage",coalesce("custage",lit(0)).cast("int")).withColumn("agegroup", age_custom_validation("custage"))
    custom_agegrp_munged_enriched_df.show()

    print("Core Data Processing/Transformation (Level1) (Pre Wrangling) Curation")
    pre_wrangled_customized_munged_enriched_df = pre_wrangle(custom_agegrp_munged_enriched_df)
    pre_wrangled_customized_munged_enriched_df.show()

    filtered_nochildren_rowcol_df_for_further_wrangling1 = custom_agegrp_munged_enriched_df.filter("agegroup <> 'Child'")\
        .select("id", "custage", "curdt", "custfname", "year", "agegroup")
    filtered_nochildren_rowcol_df_for_further_wrangling1.show()

    dim_year_agegrp_prof_metrics_avg_mean_max_min_distinctCount_count_for_consumption2 = custom_agegrp_munged_enriched_df.filter("agegroup <> 'Child'")
    aggr_df = aggregate_data(dim_year_agegrp_prof_metrics_avg_mean_max_min_distinctCount_count_for_consumption2)
    aggr_df.show()

    aggr_filter_df = aggr_df.filter("avg_custage > 35")
    aggr_filter_df.show()

    print("Analytical Functionalities")
    sampledf, summarydf, corrval, covval, freqdf = prewrang_anal(custom_agegrp_munged_enriched_df)
    sampledf.show()
    summarydf.show()
    print(f"Correlation value of age is {corrval}")
    print(f"Covariance value of age is {covval}")
    freqdf.show()

    masked_df = custom_agegrp_munged_enriched_df.withColumn("custfname", md5(col("custfname")))
    masked_df.show()

    print("Core Data Curation/Processing/Transformation (Level2) Data Wrangling")
    denormalizeddf = custdf_clean.alias("c").join(txns_dedup.alias("t"), col("c.id") == col("t.custid"), "inner")
    denormalizeddf.show()
    rno_txns3 = denormalizeddf.select("*", row_number().over(Window.orderBy("dt")).alias("sno"))
    rno_txns3.show()

    print("Data Persistence & Consumption")
    print("final df output")
    custom_agegrp_munged_enriched_df.show()
    custom_agegrp_munged_enriched_df.write.mode("overwrite").saveAsTable("cust_final_tbl")
    aggr_filter_df.write.mode("overwrite").json("/mnt/drive/ETLresult")
    #rno_txns3.write.format("jdbc").option("url", "jdbc:mysql://localhost/custdb").option("dbtable", "custtxns")\
    #    .option("user", "root").option("password", "password").option("driver", "com.mysql.jdbc.Driver").save()
    
    print("ETL Processing Completed")
    spark.stop()




In [0]:
dbutils.widgets.text("cust_data_path", "")
dbutils.widgets.text("txns_data_path", "")
dbutils.widgets.text("connection_properties_path", "")

input_path1 = dbutils.widgets.get("cust_data_path")
input_path2 = dbutils.widgets.get("txns_data_path")
connection_properties_path3 = dbutils.widgets.get("connection_properties_path")

In [0]:
#dbutils.notebook.run("./reusable_functions", 30,{"param1":"- passed from the parent notebook"})
#dbutils.notebook.run("./reusable_functions", 30)

In [0]:
if __name__ == "__main__":
    import sys
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark.sql.window import *
    print(input_path1,input_path2,connection_properties_path3)
    arg = ["ETL Pipeline",input_path1,input_path2,connection_properties_path3]
    main(arg)

com.databricks.backend.common.rpc.SparkStoppedException: Spark down: 
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:693)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:730)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:556)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:482)
	at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:290)
	at java.lang.Thread.run(Thread.java:750)