In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, FloatType, DateType

#Ingestion Process
def IngestCSV(path):
    schema = StructType([
        StructField("FirstName", StringType()),
        StructField("LastName", StringType()),
        StructField("Company", StringType()),
        StructField("BirthDate", IntegerType()),
        StructField("Salary", FloatType()),
        StructField("Address", StringType()),
        StructField("Suburb", StringType()),
        StructField("State", StringType()),
        StructField("Post", IntegerType()),
        StructField("Phone", IntegerType()),
        StructField("Mobile", IntegerType()),
        StructField("Email", StringType()),
    ])    
    df = (spark.read
    .format("csv")
    .option("mode", "PERMISSIVE")
    .schema(schema)
    .load(path, format="csv", sep="|", mode="PERMISSIVE", header="false")
    )
    return df

In [0]:
#Transform Process
def Transform(df):
    df = df.withColumn("FullName", F.concat_ws(" ", F.trim(F.col("FirstName")), F.trim(F.col("LastName"))))
    df = df.withColumn('SalaryBucket',  F.when(F.col('Salary') < 50000, F.lit("A"))
                                         .when((F.col('Salary') >= 50000) & (F.col('Salary') <= 100000), F.lit("B"))
                                         .otherwise(F.lit("C")))
                       
    df = df.withColumn('Salary', F.concat(F.lit("$"), F.format_number('Salary', 4).cast("string")))
    df = df.withColumn('BirthDateTemp', F.from_unixtime('BirthDate').cast(DateType()))
    df = df.withColumn('BirthDate', F.date_format(F.from_unixtime('BirthDate').cast(DateType()), 'dd/MM/yyyy'))
    df = df.withColumn('age', (F.months_between(F.current_date(), F.col('BirthDateTemp')) / 12).cast('int'))
    df = df.drop("FirstName", "LastName", "BirthDateTemp")
    df.createOrReplaceTempView("member_data")       
    return df


In [0]:
#Load Process - Extract to JSON 
def Load(df):
    results = df.toPandas().to_json(orient='records')
    print(results)

In [0]:
#ETL Pipeline Run
Load(Transform(IngestCSV("abfss://test.dfs.core.windows.net/member-data.csv")))


[{"Company":"Brandt, Jonathan F Esq","BirthDate":"05\/07\/1970","Salary":"$330,949.2188","Address":"171 E 24th St","Suburb":"Leith","State":"TAS","Post":7315,"Phone":381749123,"Mobile":458665290,"Email":"rebbecca.didio@didio.com.au","FullName":"Rebbecca Didio","SalaryBucket":"C","age":54.0},{"Company":"Landrum Temporary Services","BirthDate":"17\/07\/1970","Salary":"$309,558.5312","Address":"22222 Acoma St","Suburb":"Proston","State":"QLD","Post":4613,"Phone":799973366,"Mobile":497622620,"Email":"stevie.hallo@hotmail.com","FullName":"Stevie Hallo","SalaryBucket":"C","age":54.0},{"Company":"Inabinet, Macre Esq","BirthDate":"24\/01\/1970","Salary":"$539,508.3750","Address":"534 Schoenborn St #51","Suburb":"Hamel","State":"WA","Post":6215,"Phone":855589019,"Mobile":427885282,"Email":"mariko_stayer@hotmail.com","FullName":"Mariko Stayer","SalaryBucket":"C","age":54.0},{"Company":"Morris Downing & Sherred","BirthDate":"12\/06\/1970","Salary":"$293,515.5000","Address":"69206 Jackson Ave","Su

In [0]:
%sql
select * from member_data

Company,BirthDate,Salary,Address,Suburb,State,Post,Phone,Mobile,Email,FullName,SalaryBucket,age
"Brandt, Jonathan F Esq",05/07/1970,"$330,949.2188",171 E 24th St,Leith,TAS,7315,381749123,458665290,rebbecca.didio@didio.com.au,Rebbecca Didio,C,54.0
Landrum Temporary Services,17/07/1970,"$309,558.5312",22222 Acoma St,Proston,QLD,4613,799973366,497622620,stevie.hallo@hotmail.com,Stevie Hallo,C,54.0
"Inabinet, Macre Esq",24/01/1970,"$539,508.3750",534 Schoenborn St #51,Hamel,WA,6215,855589019,427885282,mariko_stayer@hotmail.com,Mariko Stayer,C,54.0
Morris Downing & Sherred,12/06/1970,"$293,515.5000",69206 Jackson Ave,Talmalmo,NSW,2640,260444682,443795912,gerardo_woodka@hotmail.com,Gerardo Woodka,C,54.0
"Buelt, David L Esq",21/08/1970,"$395,121.2500",808 Glen Cove Ave,Lane Cove,NSW,1595,214556085,453666885,mayra.bena@gmail.com,Mayra Bena,C,54.0
Artesian Ice & Cold Storage Co,14/12/1970,"$573,376.9375",373 Lafayette St,Cartmeticup,WA,6316,878681355,451966921,idella@hotmail.com,Idella Scotland,C,53.0
Midway Hotel,05/07/1970,"$896,019.7500",87 Sylvan Ave,Nyamup,WA,6258,865228931,427991688,sklar@hotmail.com,Sherill Klar,C,54.0
"Selsor, Robert J Esq",13/01/1970,"$863,933.7500",60562 Ky Rt 321,Bendick Murrell,NSW,2803,252269402,415961606,ena_desjardiws@desjardiws.com.au,Ena Desjardiws,C,54.0
Vincent J Petti & Co,20/05/1970,"$756,980.3125",70 S 18th Pl,Purrawunda,QLD,4356,731849989,411732965,vince_siena@yahoo.com,Vince Siena,C,54.0
"Prentiss, Paul F Esq",03/12/1970,"$382,643.3438",8839 Ventura Blvd,Blanchetown,SA,5357,868904661,461862457,tjarding@hotmail.com,Theron Jarding,C,53.0
