Copyright (c) Microsoft Corporation.

Licensed under the MIT License.

In [None]:
from pyspark.sql.types import * 
from pyspark.sql.functions import * 

In [None]:
programSchema = StructType([
                        StructField("CreatedBy",StringType(),True),
                        StructField("CreatedOn",TimestampType(),True),
                        StructField("CreatedOnBehalfBy",StringType(),True),
                        StructField("ImportSequenceNumber",IntegerType(),True),
                        StructField("ModifiedBy",StringType(),True),
                        StructField("ModifiedOn",TimestampType(),True),
                        StructField("ModifiedOnBehalfBy",StringType(),True),
                        StructField("ApplicationType",StringType(),True),
                        StructField("ContactTelephoneNumber",StringType(),True),
                        StructField("EligibilityRequirementsStatement",StringType(),True),
                        StructField("ExternalIdentifier",StringType(),True),
                        StructField("ExternalSystem",StringType(),True),
                        StructField("GovernmentOrganization",StringType(),True),
                        StructField("InformationContactTelephone",StringType(),True),
                        StructField("IntroductionDate",TimestampType(),True),
                        StructField("MissionStatement",StringType(),True),
                        StructField("PortalImageURL",StringType(),True),
                        StructField("PrivacyStatement",StringType(),True),
                        StructField("ProgramId",StringType(),True),
                        StructField("ProgramName",StringType(),True),
                        StructField("ProgramStatus",StringType(),True),
                        StructField("ProgramType",StringType(),True),
                        StructField("RestrictionsStatement",StringType(),True),
                        StructField("Service",StringType(),True),
                        StructField("SortOrder",IntegerType(),True),
                        StructField("URL",StringType(),True),
                        StructField("OverriddenCreatedOn",TimestampType(),True),
                        StructField("OwnerId",StringType(),True),
                        StructField("OwningBusinessUnit",StringType(),True),
                        StructField("OwningTeam",StringType(),True),
                        StructField("OwningUser",StringType(),True),
                        StructField("statecode",StringType(),True),
                        StructField("TimeZoneRuleVersionNumber",IntegerType(),True),
                        StructField("UTCConversionTimeZoneCode",IntegerType(),True),
                        StructField("VersionNumber",IntegerType(),True), 
                        StructField("Budget",DoubleType(),True),
                        StructField('ProgramSubType', StringType(), True),
                        StructField('TargetParticipants', IntegerType(), True),
                        StructField("Expenditure",DoubleType(),True)
                    ])

sql_str = ''' select * from loaddata_programs where Id is not null'''
df_programs = spark.sql(sql_str)

df_programs = df_programs.withColumn("CreatedOn", lit(None)) \
        .withColumn("CreatedOnBehalfBy", lit(None)) \
        .withColumn("ImportSequenceNumber", lit(None)) \
        .withColumn("ModifiedBy", lit(None)) \
        .withColumn("ModifiedOn", lit(None)) \
        .withColumn("ModifiedOnBehalfBy", lit(None)) \
        .withColumn("ApplicationType", lit(None)) \
        .withColumn("ContactTelephoneNumber", lit(None)) \
        .withColumnRenamed("Eligibility", "EligibilityRequirementsStatement") \
        .withColumn("ExternalIdentifier", lit(None)) \
        .withColumn("ExternalSystem", lit(None)) \
        .withColumn("GovernmentOrganization", lit(None)) \
        .withColumn("InformationContactTelephone", lit(None)) \
        .withColumn("IntroductionDate", lit(None)) \
        .withColumnRenamed('Mission', 'MissionStatement') \
        .withColumn("PortalImageURL", lit(None)) \
        .withColumn("PrivacyStatement", lit(None)) \
        .withColumnRenamed('Id', 'ProgramId') \
        .withColumnRenamed("Status", "ProgramStatus") \
        .withColumnRenamed('Name', 'ProgramName') \
        .withColumnRenamed('programTypeId','ProgramType') \
        .withColumn("RestrictionsStatement", lit(None)) \
        .withColumn("Service", lit(None)) \
        .withColumn("SortOrder", lit(None)) \
        .withColumn("URL", lit(None)) \
        .withColumn("OverriddenCreatedOn", lit(None)) \
        .withColumn("OwnerId", lit(None)) \
        .withColumn("OwningBusinessUnit", lit(None)) \
        .withColumn("statecode", lit(None)) \
        .withColumn("TimeZoneRuleVersionNumber", lit(None)) \
        .withColumn("UTCConversionTimeZoneCode", lit(None)) \
        .withColumn("VersionNumber", lit(None)) \
        .select("CreatedBy","CreatedOn", "CreatedOnBehalfBy", "ImportSequenceNumber", "ModifiedBy", "ModifiedOn", "ModifiedOnBehalfBy", 
                "ApplicationType", "ContactTelephoneNumber", "EligibilityRequirementsStatement", "ExternalIdentifier", 
                "ExternalSystem", "GovernmentOrganization", "InformationContactTelephone", "IntroductionDate", "MissionStatement", 
                "PortalImageURL", "PrivacyStatement", "ProgramId", "ProgramName", "ProgramStatus", "ProgramType", "RestrictionsStatement", 
                "Service", "SortOrder", "URL", "OverriddenCreatedOn", "OwnerId", "OwningBusinessUnit", "OwningTeam", "OwningUser", 
                "statecode", "TimeZoneRuleVersionNumber", "UTCConversionTimeZoneCode", "VersionNumber", "Budget", "ProgramSubType", "TargetParticipants", "Expenditure")
df_programs = spark.createDataFrame(df_programs.collect(), schema=programSchema)  
df_programs.write.mode('overwrite').saveAsTable('programs')
display(df_programs)


In [None]:
programTypeSchema = StructType([
                        StructField("CreatedBy",StringType(),True),
                        StructField("CreatedOn",TimestampType(),True),
                        StructField("CreatedOnBehalfBy",StringType(),True),
                        StructField("ImportSequenceNumber",IntegerType(),True),
                        StructField("ModifiedBy",StringType(),True),
                        StructField("ModifiedOn",TimestampType(),True),
                        StructField("ModifiedOnBehalfBy",StringType(),True),
                        StructField("ProgramTypeDescription",StringType(),True),
                        StructField("ProgramTypeId",StringType(),True),
                        StructField("ProgramTypeName",StringType(),True),
                        StructField("OverriddenCreatedOn",TimestampType(),True),
                        StructField("OwnerId",StringType(),True),
                        StructField("OwningBusinessUnit",StringType(),True),
                        StructField("OwningTeam",StringType(),True),
                        StructField("OwningUser",StringType(),True),
                        StructField("statecode",StringType(),True),
                        StructField("statuscode",StringType(),True),
                        StructField("TimeZoneRuleVersionNumber",IntegerType(),True),
                        StructField("UTCConversionTimeZoneCode",IntegerType(),True),
                        StructField("VersionNumber",IntegerType(),True)
                    ])
                    
sql_str = '''select * from loaddata_program_types where Id is not null'''
df_program_types = spark.sql(sql_str)

df_program_types = df_program_types \
                    .withColumn("CreatedOn",lit(None)) \
                    .withColumn("CreatedOnBehalfBy",lit(None)) \
                    .withColumn("ImportSequenceNumber",lit(None)) \
                    .withColumn("ModifiedBy",lit(None)) \
                    .withColumn("ModifiedOn",lit(None)) \
                    .withColumn("ModifiedOnBehalfBy",lit(None)) \
                    .withColumnRenamed("Description","ProgramTypeDescription") \
                    .withColumnRenamed("Id", "ProgramTypeId") \
                    .withColumn("OverriddenCreatedOn",lit(None)) \
                    .withColumn("OwnerId",lit(None)) \
                    .withColumn("OwningBusinessUnit",lit(None)) \
                    .withColumn("statecode",lit(None)) \
                    .withColumn("statuscode",lit(None)) \
                    .withColumn("TimeZoneRuleVersionNumber",lit(None)) \
                    .withColumn("UTCConversionTimeZoneCode",lit(None)) \
                    .withColumn("VersionNumber",lit(None)) \
                    .select("CreatedBy", "CreatedOn", "CreatedOnBehalfBy", "ImportSequenceNumber", "ModifiedBy", "ModifiedOn", "ModifiedOnBehalfBy", 
                    "ProgramTypeDescription", "ProgramTypeId", "ProgramTypeName", "OverriddenCreatedOn", "OwnerId", "OwningBusinessUnit", 
                    "OwningTeam", "OwningUser", "statecode", "statuscode", "TimeZoneRuleVersionNumber", "UTCConversionTimeZoneCode", "VersionNumber") 

df_program_types = spark.createDataFrame(df_program_types.collect(), schema=programTypeSchema)     
df_program_types.write.mode('overwrite').saveAsTable('program_types')
display(df_program_types)

In [None]:
contactSchema = StructType([
                        StructField("createdBy",StringType(),True),
                        StructField("createdOn",TimestampType(),True),
                        StructField("modifiedOn",TimestampType(),True),
                        StructField("modifiedBy",StringType(),True),
                        StructField("createdOnBehalfBy",StringType(),True),
                        StructField("modifiedOnBehalfBy",StringType(),True),
                        StructField("overriddenCreatedOn",TimestampType(),True),
                        StructField("importSequenceNumber",StringType(),True),
                        StructField("ownerId",StringType(),True),
                        StructField("ownerIdType",StringType(),True),
                        StructField("owningBusinessUnit",StringType(),True),
                        StructField("owningUser",StringType(),True),
                        StructField("owningTeam",StringType(),True),
                        StructField("timeZoneRuleVersionNumber",StringType(),True),
                        StructField("UTCConversionTimeZoneCode",StringType(),True),
                        StructField("versionNumber",StringType(),True),
                        StructField("contactId",StringType(),True),
                        StructField("customerSizeCode",StringType(),True),
                        StructField("customerSizeCode_display",StringType(),True),
                        StructField("customerTypeCode",StringType(),True),
                        StructField("customerTypeCode_display",StringType(),True),
                        StructField("preferredContactMethodCode",StringType(),True),
                        StructField("preferredContactMethodCode_display",StringType(),True),
                        StructField("leadSourceCode",StringType(),True),
                        StructField("leadSourceCode_display",StringType(),True),
                        StructField("paymentTermsCode",StringType(),True),
                        StructField("paymentTermsCode_display",StringType(),True),
                        StructField("shippingMethodCode",StringType(),True),
                        StructField("shippingMethodCode_display",StringType(),True),
                        StructField("accountId",StringType(),True),
                        StructField("participatesInWorkflow",StringType(),True),
                        StructField("isBackofficeCustomer",StringType(),True),
                        StructField("salutation",StringType(),True),
                        StructField("jobTitle",StringType(),True),
                        StructField("firstName",StringType(),True),
                        StructField("department",StringType(),True),
                        StructField("nickName",StringType(),True),
                        StructField("middleName",StringType(),True),
                        StructField("lastName",StringType(),True),
                        StructField("suffix",StringType(),True),
                        StructField("yomiFirstName",StringType(),True),
                        StructField("fullName",StringType(),True),
                        StructField("yomiMiddleName",StringType(),True),
                        StructField("yomiLastName",StringType(),True),
                        StructField("anniversary",StringType(),True),
                        StructField("birthDate",StringType(),True),
                        StructField("governmentId",StringType(),True),
                        StructField("yomiFullName",StringType(),True),
                        StructField("description",StringType(),True),
                        StructField("employeeId",StringType(),True),
                        StructField("genderCode",StringType(),True),
                        StructField("genderCode_display",StringType(),True),
                        StructField("annualIncome",StringType(),True),
                        StructField("hasChildrenCode",StringType(),True),
                        StructField("hasChildrenCode_display",StringType(),True),
                        StructField("educationCode",StringType(),True),
                        StructField("educationCode_display",StringType(),True),
                        StructField("webSiteUrl",StringType(),True),
                        StructField("familyStatusCode",StringType(),True),
                        StructField("familyStatusCode_display",StringType(),True),
                        StructField("ftpSiteUrl",StringType(),True),
                        StructField("EMailAddress1",StringType(),True),
                        StructField("spousesName",StringType(),True),
                        StructField("assistantName",StringType(),True),
                        StructField("EMailAddress2",StringType(),True),
                        StructField("assistantPhone",StringType(),True),
                        StructField("EMailAddress3",StringType(),True),
                        StructField("doNotPhone",StringType(),True),
                        StructField("managerName",StringType(),True),
                        StructField("managerPhone",StringType(),True),
                        StructField("doNotFax",StringType(),True),
                        StructField("doNotEMail",StringType(),True),
                        StructField("doNotPostalMail",StringType(),True),
                        StructField("doNotBulkEMail",StringType(),True),
                        StructField("doNotBulkPostalMail",StringType(),True),
                        StructField("accountRoleCode",StringType(),True),
                        StructField("accountRoleCode_display",StringType(),True),
                        StructField("territoryCode",StringType(),True),
                        StructField("territoryCode_display",StringType(),True),
                        StructField("creditLimit",StringType(),True),
                        StructField("creditOnHold",StringType(),True),
                        StructField("numberOfChildren",StringType(),True),
                        StructField("childrensNames",StringType(),True),
                        StructField("mobilePhone",StringType(),True),
                        StructField("pager",StringType(),True),
                        StructField("telephone1",StringType(),True),
                        StructField("telephone2",StringType(),True),
                        StructField("telephone3",StringType(),True),
                        StructField("fax",StringType(),True),
                        StructField("aging30",StringType(),True),
                        StructField("stateCode",StringType(),True),
                        StructField("stateCode_display",StringType(),True),
                        StructField("aging60",StringType(),True),
                        StructField("statusCode",StringType(),True),
                        StructField("statusCode_display",StringType(),True),
                        StructField("aging90",StringType(),True),
                        StructField("parentContactId",StringType(),True),
                        StructField("address1AddressId",StringType(),True),
                        StructField("address1AddressTypeCode",StringType(),True),
                        StructField("address1AddressTypeCode_display",StringType(),True),
                        StructField("address1Name",StringType(),True),
                        StructField("address1PrimaryContactName",StringType(),True),
                        StructField("address1Line1",StringType(),True),
                        StructField("address1Line2",StringType(),True),
                        StructField("address1Line3",StringType(),True),
                        StructField("address1City",StringType(),True),
                        StructField("address1StateOrProvince",StringType(),True),
                        StructField("address1County",StringType(),True),
                        StructField("address1Country",StringType(),True),
                        StructField("address1PostOfficeBox",StringType(),True),
                        StructField("address1PostalCode",StringType(),True),
                        StructField("address1UTCOffset",StringType(),True),
                        StructField("address1FreightTermsCode",StringType(),True),
                        StructField("address1FreightTermsCode_display",StringType(),True),
                        StructField("address1UPSZone",StringType(),True),
                        StructField("address1Latitude",StringType(),True),
                        StructField("address1Telephone1",StringType(),True),
                        StructField("address1Longitude",StringType(),True),
                        StructField("address1ShippingMethodCode",StringType(),True),
                        StructField("address1ShippingMethodCode_display",StringType(),True),
                        StructField("address1Telephone2",StringType(),True),
                        StructField("address1Telephone3",StringType(),True),
                        StructField("address1Fax",StringType(),True),
                        StructField("address1Composite",StringType(),True),
                        StructField("preferredSystemUserId",StringType(),True),
                        StructField("masterId",StringType(),True),
                        StructField("preferredAppointmentDayCode",StringType(),True),
                        StructField("preferredAppointmentDayCode_display",StringType(),True),
                        StructField("preferredAppointmentTimeCode",StringType(),True),
                        StructField("preferredAppointmentTimeCode_display",StringType(),True),
                        StructField("doNotSendMM",StringType(),True),
                        StructField("parentCustomerId",StringType(),True),
                        StructField("parentCustomerIdType",StringType(),True),
                        StructField("merged",StringType(),True),
                        StructField("externalUserIdentifier",StringType(),True),
                        StructField("lastUsedInCampaign",StringType(),True),
                        StructField("transactionCurrencyId",StringType(),True),
                        StructField("exchangeRate",StringType(),True),
                        StructField("annualIncomeBase",StringType(),True),
                        StructField("creditLimitBase",StringType(),True),
                        StructField("aging60Base",StringType(),True),
                        StructField("aging90Base",StringType(),True),
                        StructField("aging30Base",StringType(),True),
                        StructField("stageId",StringType(),True),
                        StructField("processId",StringType(),True),
                        StructField("entityImageId",StringType(),True),
                        StructField("traversedPath",StringType(),True),
                        StructField("SLAId",StringType(),True),
                        StructField("SLAInvokedId",StringType(),True),
                        StructField("onHoldTime",StringType(),True),
                        StructField("lastOnHoldTime",StringType(),True),
                        StructField("followEmail",StringType(),True),
                        StructField("timeSpentByMeOnEmailAndMeetings",StringType(),True),
                        StructField("business2",StringType(),True),
                        StructField("callback",StringType(),True),
                        StructField("company",StringType(),True),
                        StructField("home2",StringType(),True),
                        StructField("createdByExternalParty",StringType(),True),
                        StructField("modifiedByExternalParty",StringType(),True),
                        StructField("marketingOnly",StringType(),True), 
                        StructField("Ethnicity", StringType(), True)
                ])

sql_str =  '''select* from loaddata_contacts where Id is not null'''
df_contacts = spark.sql(sql_str)

df_contacts = df_contacts.withColumn("createdOn",lit(None)) \
                .withColumn("modifiedOn",lit(None)) \
                .withColumn("modifiedBy",lit(None)) \
                .withColumn("createdOnBehalfBy",lit(None)) \
                .withColumn("modifiedOnBehalfBy",lit(None)) \
                .withColumn("overriddenCreatedOn",lit(None)) \
                .withColumn("importSequenceNumber",lit(None)) \
                .withColumn("ownerId",lit(None)) \
                .withColumn("ownerIdType",lit(None)) \
                .withColumn("owningBusinessUnit",lit(None)) \
                .withColumn("owningUser",lit(None)) \
                .withColumn("owningTeam",lit(None)) \
                .withColumn("timeZoneRuleVersionNumber",lit(None)) \
                .withColumn("UTCConversionTimeZoneCode",lit(None)) \
                .withColumn("versionNumber",lit(None)) \
                .withColumnRenamed("Id", "contactId") \
                .withColumn("customerSizeCode",lit(None)) \
                .withColumn("customerSizeCode_display",lit(None)) \
                .withColumn("customerTypeCode",lit(None)) \
                .withColumn("customerTypeCode_display",lit(None)) \
                .withColumn("preferredContactMethodCode",lit(None)) \
                .withColumn("preferredContactMethodCode_display",lit(None)) \
                .withColumn("leadSourceCode",lit(None)) \
                .withColumn("leadSourceCode_display",lit(None)) \
                .withColumn("paymentTermsCode",lit(None)) \
                .withColumn("paymentTermsCode_display",lit(None)) \
                .withColumn("shippingMethodCode",lit(None)) \
                .withColumn("shippingMethodCode_display",lit(None)) \
                .withColumn("accountId",lit(None)) \
                .withColumn("participatesInWorkflow",lit(None)) \
                .withColumn("isBackofficeCustomer",lit(None)) \
                .withColumn("salutation",lit(None)) \
                .withColumn("jobTitle",lit(None)) \
                .withColumn("firstName",lit(None)) \
                .withColumn("department",lit(None)) \
                .withColumn("nickName",lit(None)) \
                .withColumn("middleName",lit(None)) \
                .withColumn("lastName",lit(None)) \
                .withColumn("suffix",lit(None)) \
                .withColumn("yomiFirstName",lit(None)) \
                .withColumn("yomiMiddleName",lit(None)) \
                .withColumn("yomiLastName",lit(None)) \
                .withColumn("anniversary",lit(None)) \
                .withColumn("birthDate",lit(None)) \
                .withColumn("governmentId",lit(None)) \
                .withColumn("yomiFullName",lit(None)) \
                .withColumnRenamed('Name', 'fullName') \
                .withColumn("description",lit(None)) \
                .withColumn("employeeId",lit(None)) \
                .withColumnRenamed('Gender', 'genderCode') \
                .withColumn("genderCode_display",lit(None)) \
                .withColumn("annualIncome",lit(None)) \
                .withColumn("hasChildrenCode",lit(None)) \
                .withColumn("hasChildrenCode_display",lit(None)) \
                .withColumn("educationCode",lit(None)) \
                .withColumn("educationCode_display",lit(None)) \
                .withColumn("webSiteUrl",lit(None)) \
                .withColumn("familyStatusCode",lit(None)) \
                .withColumn("familyStatusCode_display",lit(None)) \
                .withColumn("ftpSiteUrl",lit(None)) \
                .withColumn("EMailAddress1",lit(None)) \
                .withColumn("spousesName",lit(None)) \
                .withColumn("assistantName",lit(None)) \
                .withColumn("EMailAddress2",lit(None)) \
                .withColumn("assistantPhone",lit(None)) \
                .withColumn("EMailAddress3",lit(None)) \
                .withColumn("doNotPhone",lit(None)) \
                .withColumn("managerName",lit(None)) \
                .withColumn("managerPhone",lit(None)) \
                .withColumn("doNotFax",lit(None)) \
                .withColumn("doNotEMail",lit(None)) \
                .withColumn("doNotPostalMail",lit(None)) \
                .withColumn("doNotBulkEMail",lit(None)) \
                .withColumn("doNotBulkPostalMail",lit(None)) \
                .withColumn("accountRoleCode",lit(None)) \
                .withColumn("accountRoleCode_display",lit(None)) \
                .withColumn("territoryCode",lit(None)) \
                .withColumn("territoryCode_display",lit(None)) \
                .withColumn("creditLimit",lit(None)) \
                .withColumn("creditOnHold",lit(None)) \
                .withColumn("numberOfChildren",lit(None)) \
                .withColumn("childrensNames",lit(None)) \
                .withColumn("mobilePhone",lit(None)) \
                .withColumn("pager",lit(None)) \
                .withColumn("telephone1",lit(None)) \
                .withColumn("telephone2",lit(None)) \
                .withColumn("telephone3",lit(None)) \
                .withColumn("fax",lit(None)) \
                .withColumn("aging30",lit(None)) \
                .withColumn("stateCode",lit(None)) \
                .withColumn("stateCode_display",lit(None)) \
                .withColumn("aging60",lit(None)) \
                .withColumn("statusCode",lit(None)) \
                .withColumn("statusCode_display",lit(None)) \
                .withColumn("aging90",lit(None)) \
                .withColumn("parentContactId",lit(None)) \
                .withColumn("address1AddressId",lit(None)) \
                .withColumn("address1AddressTypeCode",lit(None)) \
                .withColumn("address1AddressTypeCode_display",lit(None)) \
                .withColumn("address1Name",lit(None)) \
                .withColumn("address1PrimaryContactName",lit(None)) \
                .withColumn("address1Line1",lit(None)) \
                .withColumn("address1Line2",lit(None)) \
                .withColumn("address1Line3",lit(None)) \
                .withColumnRenamed("City", "address1City") \
                .withColumnRenamed("State","address1StateOrProvince") \
                .withColumn("address1County",lit(None)) \
                .withColumnRenamed("Country", "address1Country") \
                .withColumn("address1PostOfficeBox",lit(None)) \
                .withColumnRenamed("ZipCode", "address1PostalCode") \
                .withColumn("address1UTCOffset",lit(None)) \
                .withColumn("address1FreightTermsCode",lit(None)) \
                .withColumn("address1FreightTermsCode_display",lit(None)) \
                .withColumn("address1UPSZone",lit(None)) \
                .withColumn("address1Latitude",lit(None)) \
                .withColumn("address1Telephone1",lit(None)) \
                .withColumn("address1Longitude",lit(None)) \
                .withColumn("address1ShippingMethodCode",lit(None)) \
                .withColumn("address1ShippingMethodCode_display",lit(None)) \
                .withColumn("address1Telephone2",lit(None)) \
                .withColumn("address1Telephone3",lit(None)) \
                .withColumn("address1Fax",lit(None)) \
                .withColumn("address1Composite",lit(None)) \
                .withColumn("preferredSystemUserId",lit(None)) \
                .withColumn("masterId",lit(None)) \
                .withColumn("preferredAppointmentDayCode",lit(None)) \
                .withColumn("preferredAppointmentDayCode_display",lit(None)) \
                .withColumn("preferredAppointmentTimeCode",lit(None)) \
                .withColumn("preferredAppointmentTimeCode_display",lit(None)) \
                .withColumn("doNotSendMM",lit(None)) \
                .withColumn("parentCustomerId",lit(None)) \
                .withColumn("parentCustomerIdType",lit(None)) \
                .withColumn("merged",lit(None)) \
                .withColumn("externalUserIdentifier",lit(None)) \
                .withColumn("lastUsedInCampaign",lit(None)) \
                .withColumn("transactionCurrencyId",lit(None)) \
                .withColumn("exchangeRate",lit(None)) \
                .withColumn("annualIncomeBase",lit(None)) \
                .withColumn("creditLimitBase",lit(None)) \
                .withColumn("aging60Base",lit(None)) \
                .withColumn("aging90Base",lit(None)) \
                .withColumn("aging30Base",lit(None)) \
                .withColumn("stageId",lit(None)) \
                .withColumn("processId",lit(None)) \
                .withColumn("entityImageId",lit(None)) \
                .withColumn("traversedPath",lit(None)) \
                .withColumn("SLAId",lit(None)) \
                .withColumn("SLAInvokedId",lit(None)) \
                .withColumn("onHoldTime",lit(None)) \
                .withColumn("lastOnHoldTime",lit(None)) \
                .withColumn("followEmail",lit(None)) \
                .withColumn("timeSpentByMeOnEmailAndMeetings",lit(None)) \
                .withColumn("business2",lit(None)) \
                .withColumn("callback",lit(None)) \
                .withColumn("company",lit(None)) \
                .withColumn("home2",lit(None)) \
                .withColumn("createdByExternalParty",lit(None)) \
                .withColumn("modifiedByExternalParty",lit(None)) \
                .withColumn("marketingOnly",lit(None)) \
                .select("createdBy","createdOn","modifiedOn","modifiedBy","createdOnBehalfBy","modifiedOnBehalfBy",
"overriddenCreatedOn","importSequenceNumber","ownerId","ownerIdType","owningBusinessUnit","owningUser","owningTeam",
"timeZoneRuleVersionNumber","UTCConversionTimeZoneCode","versionNumber","contactId","customerSizeCode","customerSizeCode_display",
"customerTypeCode","customerTypeCode_display","preferredContactMethodCode","preferredContactMethodCode_display","leadSourceCode",
"leadSourceCode_display","paymentTermsCode","paymentTermsCode_display","shippingMethodCode","shippingMethodCode_display","accountId",
"participatesInWorkflow","isBackofficeCustomer","salutation","jobTitle","firstName","department","nickName","middleName","lastName",
"suffix","yomiFirstName","fullName","yomiMiddleName","yomiLastName","anniversary","birthDate","governmentId","yomiFullName",
"description","employeeId","genderCode","genderCode_display","annualIncome","hasChildrenCode","hasChildrenCode_display",
"educationCode","educationCode_display","webSiteUrl","familyStatusCode","familyStatusCode_display","ftpSiteUrl","EMailAddress1",
"spousesName","assistantName","EMailAddress2","assistantPhone","EMailAddress3","doNotPhone","managerName","managerPhone","doNotFax",
"doNotEMail","doNotPostalMail","doNotBulkEMail","doNotBulkPostalMail","accountRoleCode","accountRoleCode_display","territoryCode",
"territoryCode_display","creditLimit","creditOnHold","numberOfChildren","childrensNames","mobilePhone","pager","telephone1",
"telephone2","telephone3","fax","aging30","stateCode","stateCode_display","aging60","statusCode","statusCode_display","aging90",
"parentContactId","address1AddressId","address1AddressTypeCode","address1AddressTypeCode_display","address1Name",
"address1PrimaryContactName","address1Line1","address1Line2","address1Line3","address1City","address1StateOrProvince",
"address1County","address1Country","address1PostOfficeBox","address1PostalCode","address1UTCOffset","address1FreightTermsCode",
"address1FreightTermsCode_display","address1UPSZone","address1Latitude","address1Telephone1","address1Longitude",
"address1ShippingMethodCode","address1ShippingMethodCode_display","address1Telephone2","address1Telephone3","address1Fax",
"address1Composite","preferredSystemUserId","masterId","preferredAppointmentDayCode","preferredAppointmentDayCode_display",
"preferredAppointmentTimeCode","preferredAppointmentTimeCode_display","doNotSendMM","parentCustomerId","parentCustomerIdType",
"merged","externalUserIdentifier","lastUsedInCampaign","transactionCurrencyId","exchangeRate","annualIncomeBase",
"creditLimitBase","aging60Base","aging90Base","aging30Base","stageId","processId","entityImageId","traversedPath","SLAId",
"SLAInvokedId","onHoldTime","lastOnHoldTime","followEmail","timeSpentByMeOnEmailAndMeetings","business2","callback","company",
"home2","createdByExternalParty","modifiedByExternalParty","marketingOnly", "Ethnicity")
df_contacts = spark.createDataFrame(df_contacts.collect(), schema=contactSchema) 
display(df_contacts)
df_contacts.write.mode('overwrite').saveAsTable('contacts')

In [None]:
programParticipentSchema = StructType([
                                StructField("CreatedBy",StringType(),True),
                                StructField("CreatedOn",TimestampType(),True),
                                StructField("CreatedOnBehalfBy",StringType(),True),
                                StructField("ImportSequenceNumber",IntegerType(),True),
                                StructField("ModifiedBy",StringType(),True),
                                StructField("ModifiedOn",TimestampType(),True),
                                StructField("ModifiedOnBehalfBy",StringType(),True),
                                StructField("EndDate",TimestampType(),True),
                                StructField("ExternalIdentifier",StringType(),True),
                                StructField("ExternalSystem",StringType(),True),
                                StructField("Name",StringType(),True),
                                StructField("Note",StringType(),True),
                                StructField("Program",StringType(),True),
                                StructField("ProgramParticipantId",StringType(),True),
                                StructField("Recipient",StringType(),True),
                                StructField("StartDate",TimestampType(),True),
                                StructField("OverriddenCreatedOn",TimestampType(),True),
                                StructField("OwnerId",StringType(),True),
                                StructField("OwningBusinessUnit",StringType(),True),
                                StructField("OwningTeam",StringType(),True),
                                StructField("OwningUser",StringType(),True),
                                StructField("statecode",StringType(),True),
                                StructField("statuscode",StringType(),True),
                                StructField("TimeZoneRuleVersionNumber",IntegerType(),True),
                                StructField("UTCConversionTimeZoneCode",IntegerType(),True),
                                StructField("VersionNumber",IntegerType(),True), 
                                StructField("Amount", DoubleType(), True)
                    ])

sql_str =  '''select * from loaddata_program_participants where programParticipantId is not null'''
df_program_participants = spark.sql(sql_str)


df_program_participants = df_program_participants.withColumn("CreatedOn",lit(None)) \
                .withColumn("CreatedOnBehalfBy",lit(None)) \
                .withColumn("ImportSequenceNumber",lit(None)) \
                .withColumn("ModifiedBy",lit(None)) \
                .withColumn("ModifiedOn",lit(None)) \
                .withColumn("ModifiedOnBehalfBy",lit(None)) \
                .withColumn("EndDate",lit(None)) \
                .withColumn("ExternalIdentifier",lit(None)) \
                .withColumn("ExternalSystem",lit(None)) \
                .withColumn("Name",lit(None)) \
                .withColumn("Note",lit(None)) \
                .withColumnRenamed("ProgramId", "Program") \
                .withColumn("Recipient",lit(None)) \
                .withColumn("StartDate",lit(None)) \
                .withColumn("OverriddenCreatedOn",lit(None)) \
                .withColumn("OwnerId",lit(None)) \
                .withColumn("OwningBusinessUnit",lit(None)) \
                .withColumn("OwningTeam",lit(None)) \
                .withColumn("OwningUser",lit(None)) \
                .withColumn("statecode",lit(None)) \
                .withColumn("statuscode",lit(None)) \
                .withColumn("TimeZoneRuleVersionNumber",lit(None)) \
                .withColumn("UTCConversionTimeZoneCode",lit(None)) \
                .withColumn("VersionNumber",lit(None)) 

df_program_participants = df_program_participants.select("CreatedBy","CreatedOn","CreatedOnBehalfBy","ImportSequenceNumber",
"ModifiedBy","ModifiedOn","ModifiedOnBehalfBy","EndDate","ExternalIdentifier",
"ExternalSystem","Name","Note","Program","ProgramParticipantId","Recipient","StartDate",
"OverriddenCreatedOn","OwnerId","OwningBusinessUnit","OwningTeam","OwningUser","statecode",
"statuscode","TimeZoneRuleVersionNumber","UTCConversionTimeZoneCode","VersionNumber", "Amount")

df_program_participants = spark.createDataFrame(df_program_participants.collect(), schema=programParticipentSchema) 
display(df_program_participants)
df_program_participants.write.mode('overwrite').saveAsTable('program_participants')

In [None]:
sql_str =  '''select * from loaddata_programtype_metrics_outcomes where ProgramTypeId is not Null'''
df_programtype_metrics_outcomes = spark.sql(sql_str)
df_programtype_metrics_outcomes.write.mode('overwrite').saveAsTable('programtype_metrics_outcomes')
# display(df_programtype_metrics_outcomes)

In [None]:
sql_str =  '''select * from loaddata_program_metrics_outcomes'''
df_program_metrics_outcomes = spark.sql(sql_str)
df_program_metrics_outcomes.write.mode('overwrite').saveAsTable('program_metrics_outcomes')
# display(df_program_metrics_outcomes)

In [None]:
# create empty table for measures and save to SAdata folder

col_lst = ['Measures']
values = [[0]]

df = spark.createDataFrame(values, col_lst)
df.write.mode('overwrite').saveAsTable('AllMeasures')