Copyright (c) Microsoft Corporation.

Licensed under the MIT License.

# Library Imports

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from notebookutils import mssparkutils
import json

# Read in Data from Azure Data Lake

In [None]:
data_lake_account_name = '' # Synapse Workspace ADLS
file_system_name = 'data'
synapse_workspace_name = ''


In [None]:
try:
    spark.sql("CREATE DATABASE c360_data")
except:
    print("Database already exists")


In [None]:
filename = 'properties'
filebasepath = f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/sourcedata/'
df = spark.read.load(filebasepath + filename.lower() + '.csv', format='csv', header=True,inferSchema=True)
df.write.mode("overwrite").saveAsTable("c360_data." + filename)

In [None]:
id_columns = ['cid','pid','ptid','uid','utid','lid','sid','wid','paymentid','PostCode']
date_columns = ['DateOfBirth']
double_columns = ['LeaseTerm','answer']

# convert sourcedata csv files to parquet
filenames = ['residents_source1','residents_source2','leases','payments','workorders','surveys']

filebasepath = f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/sourcedata/'
dfspath = 'https://' + data_lake_account_name + '.dfs.core.windows.net/' + file_system_name + '/sourcedata/'

for filename in filenames:
    df = spark.read.load(filebasepath + filename.lower() + '.csv', format='csv', header=True,inferSchema=True)

   
    id_cols = [col for col in df.columns if col in id_columns]
    for id_col in id_cols:
        if id_col == 'PostCode':
            df = df.withColumn("PostCode",lpad(df.PostCode, 5, '0'))  
        df = df.withColumn(id_col,df[id_col].cast(StringType()))

    date_cols = [col for col in df.columns if col in date_columns]
    for date_col in date_cols:
        #df = df.withColumn(date_col,df[date_col].cast(DateType())) 
        df = df.withColumn(date_col,df[date_col].cast(TimestampType())) 

    double_cols = [col for col in df.columns if col in double_columns]
    for double_col in double_cols:
        df = df.withColumn(double_col,df[double_col].cast(DoubleType()))     

    df.write.mode("overwrite").saveAsTable("c360_data." + filename)
    

In [None]:
def get_ci_datedefinitions():
    definitions = []
    d = {
            "traitName": "is.formatted",
            "extendsTrait": "is",
            "explanation": "a root for traits that descibe how data is formatted"
        }
    definitions.append(d)
    d = {
            "traitName": "is.formatted.dateTime",
            "extendsTrait": "is.formatted",
            "explanation": "DateTime data formatted as a string in ISO 8601 format",
            "hasParameters": [{
                "name": "format",
                "dataType": "stringFormat",
                "defaultValue": "MM/DD/YYYY hh:mm"
            }]
        }
    definitions.append(d)
    d = {
            "traitName": "is.formatted.date",
            "extendsTrait": "is.formatted",
            "explanation": "Date data formatted as a string in ISO 8601 format",
            "hasParameters": [{
                "name": "format",
                "dataType": "stringFormat",
                "defaultValue": "MM/DD/YYYY"
            }]
        }
    definitions.append(d)
    d = {
            "traitName": "is.formatted.time",
            "extendsTrait": "is.formatted",
            "explanation": "Time data formatted as a string in ISO 8601 format",
            "hasParameters": [{
                "name": "format",
                "dataType": "stringFormat",
                "defaultValue": "hh:mm:ss"
            }]
        }
    definitions.append(d)
    return(definitions)


In [None]:
#create manifest.json

filenames = ['residents_source1','residents_source2','leases','payments','workorders','surveys']

filebasepath = f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/synapse/workspaces/{synapse_workspace_name}/warehouse/c360_data.db/'
dfspath = 'https://' + data_lake_account_name + '.dfs.core.windows.net/' + file_system_name + '/synapse/workspaces/' + synapse_workspace_name + '/warehouse/c360_data.db/'
id_columns = ['cid','pid','ptid','uid','utid','lid','wid','sid','paymentid','CustomerId']
curreny_columns = ['amount','RenewalPredictionScore','LeaseTerm','answer']

entities = []

for filename in filenames:
    df = spark.read.load(filebasepath + filename.lower(), format='parquet', header=True,inferSchema=True)
    attributes = []
    partitions = []
    contents = []
    for e in df.schema:
        if e.name in id_columns:
            edatatype = 'String'
        elif e.name in curreny_columns:
           edatatype = 'Double'
        elif str(e.dataType) == 'StringType':
            edatatype = 'String'
        elif str(e.dataType) == 'TimestampType':
            edatatype = 'DateTime'
        elif str(e.dataType) == 'DateType':
            edatatype = 'Date'
        elif str(e.dataType) == 'IntegerType':
            edatatype = 'Int32' #'Integer'
        else:
            edatatype = 'String'
  
        if edatatype == 'DateTime':
          attr ={
              "name": e.name,
              "appliedTraits": [
                  "is.formatted.dateTime"
                ],
              "dataFormat": edatatype
          }
        elif edatatype == 'Date':
          attr ={
              "name": e.name,
              "appliedTraits": [
                  "is.formatted.date"
                ],
              "dataFormat": edatatype
          }
        else:
          attr ={
              "name": e.name,
              "dataFormat": edatatype,
          }

        attributes.append(attr)
        content = {
            "type" : "attributeDefinition",
            "name" : e.name,
            "parent" : filename + "/attributeContext/" + filename,
            "definition" : "resolvedFrom/" + filename + "/hasAttributes/" + e.name,
            "contents" : [
              filename + "/hasAttributes/" + e.name
            ]
          }
        contents.append(content)
        #break

    ent_imports = []
    d = {
        "corpusPath": '/' + filename + ".cdm.json",
        "moniker": "resolvedFrom"
        }
    ent_imports.append(d)

    ent_definitions = []
    d = {
      "entityName": filename,
      "attributeContext": {
        "type": "entity",
        "name": filename,
        "definition": "resolvedFrom/" + filename,
        "contents": contents
      },
      "hasAttributes": attributes,
      "version": "1.0.0.0"
    }
    ent_definitions.append(d)
    entity_model = {"jsonSchemaSemanticVersion": "1.1.0","imports":ent_imports, "definitions":ent_definitions}
    #print(entity_model)

    
    json_model = json.dumps(entity_model) 

    # convert to a dataframe
    json_list = []
    json_list.append(json_model)
    df = spark.read.json(sc.parallelize(json_list))
    #display(df)

    entfilebasepath = f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/c360data/' 
    entjsonpath = entfilebasepath + 'tempfolder'

    df.coalesce(1).write.format('json').mode('overwrite').save(entjsonpath)
    
    # copy the model json file written in parts to a single model.json file
    from notebookutils import mssparkutils
    files = mssparkutils.fs.ls('/c360data/tempfolder')
    for file1 in files:
        if '.json' in file1.name:
            srcfilename = '/c360data/tempfolder/' + file1.name
            targetfilename = '/c360data/resolve/' + filename + '.cdm.json'
            mssparkutils.fs.cp(srcfilename,targetfilename, True)
            break
    #delete the folder with parts file        
    mssparkutils.fs.rm('/c360data/tempfolder',recurse=True)
    #break


#create the main manifest.json file
imports = []
entities = []

for filename in filenames:
    d = {
      "type": "LocalEntity",
      "entityName": filename,
      "entityPath": "resolve/" + filename + '.cdm.json/' + filename,
      "dataPartitionPatterns": [
        {
          "name": filename,
          "rootLocation": 'synapse/workspaces/' + synapse_workspace_name + '/warehouse/c360_data.db/' + filename,
          "regularExpression": ".+\\.parquet$",
          "parameters": [],
          "exhibitsTraits": [
            {"traitReference" : "is.partition.format.parquet"}
          ]
        }
      ],
      "definitions": get_ci_datedefinitions()
    }
    entities.append(d)

manifest = {"manifestName": "default","entities": entities,"jsonSchemaSemanticVersion": "1.1.0","imports":imports}

json_model = json.dumps(manifest) 

# convert to a dataframe
json_list = []
json_list.append(json_model)
df = spark.read.json(sc.parallelize(json_list))
#display(df)

entfilebasepath = f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/c360data/' 
entjsonpath = entfilebasepath + 'tempfolder'

df.coalesce(1).write.format('json').mode('overwrite').save(entjsonpath)

# copy the model json file written in parts to a single model.json file
from notebookutils import mssparkutils
files = mssparkutils.fs.ls('/c360data/tempfolder')
for file1 in files:
    if '.json' in file1.name:
        srcfilename = '/c360data/tempfolder/' + file1.name
        targetfilename = '/c360data/' + 'default.manifest.cdm.json'
        mssparkutils.fs.cp(srcfilename,targetfilename, True)
        break
#delete the folder with parts file        
mssparkutils.fs.rm('/c360data/tempfolder',recurse=True)


In [None]:
try:
    mssparkutils.fs.rm('/default.manifest.cdm.json')
    mssparkutils.fs.rm('/resolve',recurse=True)
except:
    pass
    
srcfilename = '/c360data/default.manifest.cdm.json'
targetfilename = '/default.manifest.cdm.json'
mssparkutils.fs.cp(srcfilename,targetfilename, True)

# mssparkutils.fs.help()
srcfilename = '/c360data/resolve'
targetfilename = '/resolve'
mssparkutils.fs.cp(srcfilename,targetfilename, True)

mssparkutils.fs.rm('/c360data',recurse=True)