In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window
from pyspark.sql.types import datetime
import sys
import os
import itertools
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType
from pyspark.sql.functions import to_date
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import from_unixtime

#Create a Spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()
CONFIG_PATH = os.environ.get('CONFIG_PATH')
print(CONFIG_PATH)

gs://sjsu_cdw/config_files/


In [2]:
df_config = spark.read.option("header", "true").csv(CONFIG_PATH+"Config_ADM.csv").collect()
SOURCEFILEPATH = df_config[0]['SourceFile']
TARGETFILEPATH = df_config[1]['adm_dim_path']
fileName = "CS_DS_AGROUP"
SEQUENCE_ID = df_config[0]['monotonically_inc_id']

print(SOURCEFILEPATH)
print(TARGETFILEPATH)

gs://sjsu_cdw/raw_data/Admission/
gs://sjsu_cdw/cdw_incremantal_load/TransformedData/Admissions/Dimension_Staging/


In [3]:
#List all the required source tables
sourceTablelist = ["PS_ACAD_GROUP_TBL"]
#Get the target table filename from the python filename
targetFileName =  TARGETFILEPATH+fileName

In [4]:
import pyspark.sql.functions as func
#Iterate through the source table list and load the data into a Spark dataframe. 
def createTempTables(sparkSess,sourceTablelist):
    for tabList in sourceTablelist:
        df_source = sparkSess.read.option("header", "true").option("inferSchema","true").option("nullValue"," ").csv(SOURCEFILEPATH+tabList+".csv")
        #df1 = df_source.na.fill('-')
        #df2=df1.na.fill('1900-01-01 01:01:01 UTC')
        #df2.printSchema()
        #df_source.printSchema()
        #Register the dataframe as a temporary table
        df_source.registerTempTable(tabList)
    return df_source
df_source=createTempTables(spark,sourceTablelist)


In [6]:
#Run a Spark SQL over the temporary table
df_staging = spark.sql("""
  
SELECT CS_D_AGROUP.INSTITUTION||'~'||CS_D_AGROUP.ACAD_GROUP AS UNIFICATION_ID,
CS_D_AGROUP.ACAD_GROUP,
CS_D_AGROUP.EFFDT,
CS_D_AGROUP.INSTITUTION,
CS_D_AGROUP.EFF_STATUS,
CS_D_AGROUP.DESCR,
CS_D_AGROUP.DESCRSHORT,
CS_D_AGROUP.STDNT_SPEC_PERM,
CS_D_AGROUP.AUTO_ENRL_WAITLIST FROM 

(
SELECT 
COALESCE(RTRIM(LTRIM(PS_ACAD_GROUP_TBL.INSTITUTION)),'-') AS INSTITUTION,
COALESCE(RTRIM(LTRIM(PS_ACAD_GROUP_TBL.ACAD_GROUP)),'-') AS ACAD_GROUP,
COALESCE(TO_DATE(PS_ACAD_GROUP_TBL.EFFDT,'dd/mm/yyyy'),'1900-01-01') AS EFFDT,
COALESCE(RTRIM(LTRIM(PS_ACAD_GROUP_TBL.EFF_STATUS)),'-') AS EFF_STATUS,
COALESCE(RTRIM(LTRIM(PS_ACAD_GROUP_TBL.DESCR)),'-') AS DESCR,
COALESCE(RTRIM(LTRIM(PS_ACAD_GROUP_TBL.DESCRSHORT)),'-') AS DESCRSHORT,
COALESCE(RTRIM(LTRIM(PS_ACAD_GROUP_TBL.STDNT_SPEC_PERM)),'-') AS STDNT_SPEC_PERM,
COALESCE(RTRIM(LTRIM(PS_ACAD_GROUP_TBL.AUTO_ENRL_WAITLIST)),'-') AS AUTO_ENRL_WAITLIST,
DENSE_RANK()OVER(PARTITION BY PS_ACAD_GROUP_TBL.INSTITUTION,PS_ACAD_GROUP_TBL.ACAD_GROUP  ORDER BY PS_ACAD_GROUP_TBL.EFFDT DESC) AS DNS_RNK
FROM
PS_ACAD_GROUP_TBL)  CS_D_AGROUP
WHERE CS_D_AGROUP.DNS_RNK=1

 """)
                       
df_staging = df_staging.withColumn("EFFDT", df_staging["EFFDT"].cast(DateType()))
                       
#Below function generates the integers monotonically increasing and consecutive in a partition
#df_staging = df_staging.withColumn(SEQUENCE_ID,row_number().over(Window.orderBy(monotonically_increasing_id())))
#df_staging.show()
#df_staging=df_staging.withColumn("EFFDT", to_date(unix_timestamp("EFFDT", "M/dd/yyyy").cast("timestamp")))
#df_staging.printSchema()
#df_staging.select("EFFDT").show()                      


In [7]:
df_staging.write.option("header", "true").mode('overwrite').csv(targetFileName)
column_list = df_staging.schema.names
def listToStr(lst):
    columns = ', '.join([str(elem) for elem in lst])
    return columns
columns = listToStr(column_list)
df_staging.registerTempTable('temp')
df_staging = spark.sql("""
SELECT
{},
CURRENT_TIMESTAMP() INSERTED_DATE,
CURRENT_TIMESTAMP() UPDATED_DATE,
CAST('9999-01-01 00:00:00.000' AS TIMESTAMP) as DELETED_DATE

from temp
""".format(columns))

#Below function generates the integers monotonically increasing and consecutive in a partition
df_staging = df_staging.withColumn(SEQUENCE_ID,row_number().over(Window.orderBy(monotonically_increasing_id())))
# Needs to be moved to dim
d=df_staging.dtypes
#print(d)

datatype=[]
columnname=[]

for x in d:
    datatype.append(x[1])
    columnname.append(x[0])

#print(columnname)
#print(datatype)

n=['string', 'timestamp','int','date', 'long']

for n, i in enumerate(datatype):
    if i == 'string':
        datatype[n] = '0'
    elif i== 'int':
        datatype[n]= 0
    elif i=='timestamp':
        datatype[n]='1900-01-01 01:01:01 UTC'
    elif i=='date':
        datatype[n]='1900-01-01'
    elif i=='long':
        datatype[n]= 0

#TestSchema is the table name which in which data is loaded in BQ        
#print("The data types of the table are")
#print(datatype)
unspecifiedRow = sqlContext.createDataFrame([datatype],columnname)
#unspecifiedRow.show()
df_final = df_staging.union(unspecifiedRow)
#df_final.write.option("header", "true").mode('overwrite').csv(targetFileName)
path = "gs://sjsu_cdw/cdw_incremantal_load/TransformedData/Admissions/Dimensions/"
dfile = 'CS_D_AGROUP'
df_final.write.option("header", "true").mode('overwrite').csv(path+dfile)