Copyright (c) Microsoft Corporation.

Licensed under the MIT License.

In [None]:
data_lake_account_name = ''
file_system_name = ''

In [None]:
df_complaints = spark.read.format("csv").load(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/data/complaints.csv",header=True,escape ='"',multiLine=True)
df_complaints = df_complaints.withColumnRenamed("Sub-product", "subproduct")\
                            .withColumnRenamed("Sub-issue", "subissue")
df_complaints.write.mode("overwrite").saveAsTable("default.complaints_data")

In [None]:
from pyspark.sql.functions import *
from datetime import date, datetime
import pandas as pd
import numpy as np
from pyspark.sql.window import Window

df_names = spark.read.format("csv").load(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/data/fictitious_customer_names.csv",header=True)
df_names = df_names.select("*", concat(col("FirstName"),lit("."),col("LastName"),lit("@contoso.com")).alias("email"))
df_names = df_names.withColumn('email', lower(regexp_replace(col("email"), " ", "")))

df_names = df_names.withColumn("created_date", lit(datetime.now().strftime("%Y-%m-%d")))

df_names = df_names.select('Name','created_date','email')
cols = ['name','created_date','email']
df_names = df_names.toDF(*cols)

#get count of complaints
df_complaint_count = spark.sql('''select count(*) as ccount from default.complaints_data''').toPandas()
ccount = df_complaint_count['ccount'].iloc[0]


#use fictitious names to duplicate and get names for all complaints
df_names = df_names.toPandas()
df_names = pd.DataFrame(df_names.values.repeat(ccount/df_names.shape[0] +1,  axis=0), columns=df_names.columns)
df_names = df_names.head(ccount)
df_names = df_names.sample(frac= 1)

df_names = spark.createDataFrame(df_names)
df_names = df_names.coalesce(1).withColumn("customer_id", monotonically_increasing_id())


df_names.write.option('header', 'true').mode('overwrite').csv(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/data/customers/')
df_names.write.mode("overwrite").saveAsTable("default.customers")


#create customer and complaint links
sql_statement = '''select customer_id from default.customers'''
df_cust = spark.sql(sql_statement).toPandas()

sql_statement = '''select complaint_id from default.complaints_data'''
df_complaints = spark.sql(sql_statement).toPandas()

df_cust_compl_links = pd.concat([df_cust, df_complaints], axis=1)


#write to ADLS and also save as spark table
df_cust_compl_links_sp = spark.createDataFrame(df_cust_compl_links) 
df_cust_compl_links_sp.write.option('header', 'true').mode('overwrite').csv(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/data/customers_complaints/')
df_cust_compl_links_sp.write.mode("overwrite").saveAsTable("default.customers_complaints")

#create employee data
df_emp = spark.read.format("csv").load(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/data/fictitious_employee_names.csv",header=True)
df_emp = df_emp.select("*", concat(col("FirstName"),lit("."),col("LastName"),lit("@contoso.com")).alias("email"))
df_emp = df_emp.withColumn('email', lower(regexp_replace(col("email"), " ", "")))
df_emp = df_emp.withColumn("employee_id", monotonically_increasing_id())
df_emp = df_emp.withColumn("created_date", current_date())


df_emp = df_emp.withColumn("department",array(lit("Credit Reporting"),
                                        lit("Debt Collection"),lit("Banking Services"),
                                        lit("Card Services"),lit("Loans")
                                    ).getItem((rand()*5).cast("int")))


window = Window.partitionBy(df_emp['department']).orderBy(df_emp['employee_id'].desc())
df_emp = df_emp.select('*', rank().over(window).alias('rank')).filter(col('rank') <= 2)
df_emp = df_emp.select('Name','created_date','email','department','employee_id')

cols = ['name','created_date','email','department','employee_id']
df_emp = df_emp.toDF(*cols)

df_emp = df_emp.coalesce(1).withColumn("employee_id", monotonically_increasing_id())

#write to ADLS and also save as spark table

df_emp.write.option('header', 'true').mode('overwrite').csv(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/data/employees/')
df_emp.write.mode("overwrite").saveAsTable("default.employees")

In [None]:
# # upload seed data to cosmosdb for PowerApp
df_emp = spark.read.format("csv").load(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/data/employees/",header=True)

df_emp.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "CosmosDB")\
    .option("spark.cosmos.container", "employees")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('overwrite')\
    .save()


df_initial_complaints = spark.read.format("csv").load(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/data/initial_complaints.csv",header=True)

from pyspark.sql.functions import *
df_initial_complaints = df_initial_complaints.where(col("Department").isin(['Credit Reporting','Debt Collection','Banking Services','Card Services','Loans']))

df_initial_complaints.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "CosmosDB")\
    .option("spark.cosmos.container", "complaints")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('overwrite')\
    .save()

df_initial_responses = spark.read.format("csv").load(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/data/initial_responses.csv",header=True)

df_initial_responses = df_initial_responses.withColumn('SupportAgent',when(df_initial_responses.SupportAgent != 'Hailey Simmons',df_initial_responses.SupportAgent).otherwise('Hannah Haynes'))

df_initial_responses.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "CosmosDB")\
    .option("spark.cosmos.container", "responses")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('overwrite')\
    .save()    

In [None]:
import pandas as pd
import numpy as np

sql_statement = '''select Complaint_Id,Date_received,State,ZIP_code,Product,subproduct,Issue,subissue,Consumer_complaint_narrative from default.complaints_data'''
df = spark.sql(sql_statement).toPandas()

df = df.dropna().reset_index(drop=True)

print(df.shape)
df.replace({'Product':
             {'Credit reporting, credit repair services, or other personal consumer reports': 'Credit Reporting',
              'Debt collection': 'Debt Collection',
              'Credit reporting': 'Credit Reporting',
              'Credit card': 'Card Services',
              'Bank account or service': 'Banking Services',
              'Credit card or prepaid card': 'Card Services',
              'Student loan': 'Loans',
              'Checking or savings account': 'Banking Services',
              'Consumer Loan': 'Loans',
              'Vehicle loan or lease': 'Loans',
              'Money transfer, virtual currency, or money service': 'Banking Services',
              'Payday loan, title loan, or personal loan': 'Loans',
              'Payday loan': 'Loans',
              'Money transfers': 'Banking Services',
              'Prepaid card': 'Card Services',
              'Other financial service': 'Other',
              'Virtual currency': 'Banking Services'}
            }, inplace= True)

products_list = ['Credit Reporting','Debt Collection','Banking Services','Card Services','Loans']
df = df[df['Product'].isin(products_list)]

df.columns = ['complaint_id','date_received','state','zipcode','product','subproduct','issue','subissue','complaint']
df.complaint = df.complaint.str.lower()


# #remove issues that don't have atleast 1000 entries
df = df[df.groupby('issue').complaint.transform(len) >= 1000]
df = df.reset_index(drop=True)

import string

pattern = r"[{}]".format(string.punctuation)
df['complaint'] = df['complaint'].str.replace(pattern, ' ')

df['complaint'] = df['complaint'].str.replace('X+', '')
df['complaint'] = df['complaint'].str.replace('\n', '')
df['complaint'] = df['complaint'].str.replace(' +', ' ')
df['complaint'] = df['complaint'].str.lower()
df['complaint'] = df['complaint'].str.strip()


lengths = [len(df.iloc[i]['complaint'].split()) for i in range(len(df))]


df = df[[l >= 5 for l in lengths]]

#sample to get 2000 rows for each issue
df = df.groupby(['issue']).apply(lambda grp: grp.sample(n=2000,replace=True))

In [None]:
df = df[['complaint_id', 'date_received', 'state', 'zipcode', 'product','subproduct', 'issue', 'subissue', 'complaint']]

df_data_sp = spark.createDataFrame(df)
df_data_sp.write.mode("overwrite").saveAsTable("default.complaints_data")

df_data_sp.write.option('header', 'true').mode('overwrite').csv(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/prepareddata/all/')

from sklearn.model_selection import train_test_split
df_train, df_test = train_test_split(df, test_size=0.5)

#write train data
df_train = df_train.dropna().reset_index(drop=True)

df_data_sp = spark.createDataFrame(df_train)
df_data_sp.write.mode("overwrite").saveAsTable("default.complaints_train")

df_data_sp.write.option('header', 'true').mode('overwrite').csv(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/prepareddata/train/')

#write test data
df_test = df_test.dropna().reset_index(drop=True)

df_data_sp = spark.createDataFrame(df_test)
df_data_sp.write.mode("overwrite").saveAsTable("default.complaints_test")

df_data_sp.write.option('header', 'true').mode('overwrite').csv(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/prepareddata/test/')

In [None]:
df_train['product'].value_counts()
df_train['product'].unique()