<h2>Data Preprocessing

In [1]:
# Initializing spark session 
from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("Test").getOrCreate() 

# Import relevant packages 
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import * 
from pyspark.sql import DataFrame 
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/23 11:36:54 WARN Utils: Your hostname, Lanzias-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 10.102.129.66 instead (on interface en0)
25/10/23 11:36:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/23 11:36:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/10/23 11:36:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/10/23 11:36:56 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
# define data path
data_path = "/Users/lanzia/Desktop/Study/FS 2025/01.1st Semester/Introduction to Data in Business Analytics/Project/Data/"

In [3]:
# define the columns needed to be cleaned later
ranges = [(202319, 202326), (202401, 202426), (202501, 202518)]
numeric_fields = [StructField(str(i), DoubleType(), True) 
                  for start, end in ranges for i in range(start, end + 1)
                  ]
cols_to_clean = [str(i) for start, end in ranges for i in range(start, end + 1)]

<h4>Step 1: clean survey data

**Load the tables**

In [4]:
# survery data
btos_survey = pd.read_excel(data_path + "Sector.xlsx",  sheet_name="Response Estimates", engine="openpyxl")
btos_survey = spark.createDataFrame(btos_survey)
btos_survey.show(5, truncate=False)

25/10/23 11:37:03 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 0:>                                                          (0 + 1) / 1]

+------+-----------+--------------------------------------------------------------------+---------+-------------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+
|Sector|Question ID|Question                                                            |Answer ID|Answer       |202518|202517|202516|202515|202514|202513|202512|202511|202510|202509|202508|202507|202506|202505|202504|202503|202502|202501|202426|202425|202424|202423|202422|202421|202420|202419|202418|202417|202416|202415|202414|202413|202412|202411|202410|202409|202408|202407|202406|202405|202404|202403|202402|202401|202326|202325|202324|202323|202322|202321|202320|202319|
+------+-----------+------------------------

                                                                                

In [5]:
# Check the Schema
btos_survey.printSchema

<bound method DataFrame.printSchema of DataFrame[Sector: string, Question ID: double, Question: string, Answer ID: double, Answer: string, 202518: string, 202517: string, 202516: string, 202515: string, 202514: string, 202513: string, 202512: string, 202511: string, 202510: string, 202509: string, 202508: string, 202507: string, 202506: string, 202505: string, 202504: string, 202503: string, 202502: string, 202501: string, 202426: string, 202425: string, 202424: string, 202423: string, 202422: string, 202421: string, 202420: string, 202419: string, 202418: string, 202417: string, 202416: string, 202415: string, 202414: string, 202413: string, 202412: string, 202411: string, 202410: string, 202409: string, 202408: string, 202407: string, 202406: string, 202405: string, 202404: string, 202403: string, 202402: string, 202401: string, 202326: string, 202325: string, 202324: string, 202323: string, 202322: string, 202321: string, 202320: string, 202319: string]>

In [6]:
# filter the table to keep the rows related to AI usage question
btos_survey_filtered = (btos_survey
                   .filter((col('Question ID')==7) | (col('Question ID')==24))
                   .filter((col('Sector')!='XX'))
                   )

btos_survey_filtered.show(5, truncate=False)

+------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+-----------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+
|Sector|Question ID|Question                                                                                                                                                                                                                                  |Answer ID|Answer     |202518|202517|202516|202515|202514|202513|202512|202511|202510|202509|202508|2025

In [7]:
# clean the columns
# for value with 'S', it's equivalent of nulls; convert the rest of percentage to float value
for c in cols_to_clean:
    if c in btos_survey_filtered.columns:
        btos_survey_filtered = (btos_survey_filtered
                                .withColumn(c, F.when(F.col(c).isin("S"), None)
                                            .otherwise(F.regexp_replace(F.trim(F.col(c)), "%", "").cast("double"))
                                            )
                                )

In [8]:
# sample print
btos_survey_filtered.show(10)

                                                                                

+------+-----------+--------------------+---------+-----------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+
|Sector|Question ID|            Question|Answer ID|     Answer|202518|202517|202516|202515|202514|202513|202512|202511|202510|202509|202508|202507|202506|202505|202504|202503|202502|202501|202426|202425|202424|202423|202422|202421|202420|202419|202418|202417|202416|202415|202414|202413|202412|202411|202410|202409|202408|202407|202406|202405|202404|202403|202402|202401|202326|202325|202324|202323|202322|202321|202320|202319|
+------+-----------+--------------------+---------+-----------+------+------+------+------+------+------+------+------+------+------+------+----

In [9]:
# map sector ID to names that match with S&P factset data
btos_survey_fnl = (btos_survey_filtered
                   .withColumnRenamed('Sector','subsector')
                   .withColumn('subsector_name', when(col('subsector')==11,  lit("Agriculture, Forestry, Fishing and Hunting"))
                                              .when(col('subsector')==21, lit("Mining, Quarrying, and Oil and Gas Extraction"))
                                              .when(col('subsector')==22, lit("Utilities"))
                                              .when(col('subsector')==23, lit("Construction"))
                                              .when(col('subsector')==31, lit("Manufacturing"))
                                              .when(col('subsector')==32, lit("Manufacturing"))
                                              .when(col('subsector')==33, lit("Manufacturing"))
                                              .when(col('subsector')==42, lit("Wholesale Trade"))
                                              .when(col('subsector')==44, lit("Retail Trade"))
                                              .when(col('subsector')==45, lit("Retail Trade"))
                                              .when(col('subsector')==48, lit("Transportation and Warehousing"))
                                              .when(col('subsector')==49, lit("Transportation and Warehousing"))
                                              .when(col('subsector')==51, lit("Information"))
                                              .when(col('subsector')==52, lit("Finance and Insurance"))
                                              .when(col('subsector')==53, lit("Real Estate and Rental and Leasing"))
                                              .when(col('subsector')==54, lit("Professional, Scientific, and Technical Services"))
                                              .when(col('subsector')==55, lit("Management of Companies and Enterprises"))
                                              .when(col('subsector')==56, lit("Administrative and Support and Waste Management and Remediation Services"))
                                              .when(col('subsector')==61, lit("Educational Services"))
                                              .when(col('subsector')==62, lit("Health Care and Social Assistance"))
                                              .when(col('subsector')==71, lit("Arts, Entertainment, and Recreation"))
                                              .when(col('subsector')==72, lit("Accommodation and Food Services"))
                                              .when(col('subsector')==81, lit("Other Services (except Public Administration)"))
                                              .when(col('subsector')==92, lit("Public Administration"))
                                              .otherwise(lit(None))
                                )
                    .withColumn('Sector', when(col('subsector_name')=="Agriculture, Forestry, Fishing and Hunting", lit("Consumer Non-Cyclicals"))
                                                                .when(col('subsector_name')=="Mining, Quarrying, and Oil and Gas Extraction", lit("Energy"))
                                                                .when(col('subsector_name')=="Utilities", lit("Utilities"))
                                                                .when(col('subsector_name')=="Construction", lit("Industrials"))
                                                                .when(col('subsector_name')=="Manufacturing", lit("Industrials"))
                                                                .when(col('subsector_name')=="Wholesale Trade", lit("Industrials"))
                                                                .when(col('subsector_name')=="Retail Trade", lit("Consumer Cyclicals"))
                                                                .when(col('subsector_name')=="Transportation and Warehousing", lit("Industrials"))
                                                                .when(col('subsector_name')=="Information", lit("Technology"))
                                                                .when(col('subsector_name')=="Finance and Insurance", lit("Finance"))
                                                                .when(col('subsector_name')=="Real Estate and Rental and Leasing", lit("Finance"))
                                                                .when(col('subsector_name')=="Professional, Scientific, and Technical Services", lit("Business Services"))
                                                                .when(col('subsector_name')=="Management of Companies and Enterprises", lit("Business Services"))
                                                                .when(col('subsector_name')=="Administrative and Support and Waste Management and Remediation Services", lit("Business Services"))
                                                                .when(col('subsector_name')=="Educational Services", lit("Consumer Services"))
                                                                .when(col('subsector_name')=="Health Care and Social Assistance", lit("Healthcare"))
                                                                .when(col('subsector_name')=="Arts, Entertainment, and Recreation", lit("Consumer Services"))
                                                                .when(col('subsector_name')=="Accommodation and Food Services", lit("Consumer Services"))
                                                                .when(col('subsector_name')=="Other Services (except Public Administration)", lit("Consumer Services"))
                                                                .when(col('subsector_name')=="Public Administration", lit(None))
                                                                .otherwise(lit(None)))
                    .select('Sector','subsector','subsector_name','Question ID','Answer', *cols_to_clean)
                    )

In [10]:
# sample print
btos_survey_fnl.show(10)

+--------------------+---------+--------------------+-----------+-----------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+
|              Sector|subsector|      subsector_name|Question ID|     Answer|202319|202320|202321|202322|202323|202324|202325|202326|202401|202402|202403|202404|202405|202406|202407|202408|202409|202410|202411|202412|202413|202414|202415|202416|202417|202418|202419|202420|202421|202422|202423|202424|202425|202426|202501|202502|202503|202504|202505|202506|202507|202508|202509|202510|202511|202512|202513|202514|202515|202516|202517|202518|
+--------------------+---------+--------------------+-----------+-----------+------+------+------+------+------+----

In [11]:
# check the map
btos_survey_fnl.groupBy('Sector','subsector','subsector_name').count().sort('subsector').show(truncate=False)

[Stage 4:>                                                          (0 + 8) / 8]

+----------------------+---------+------------------------------------------------------------------------+-----+
|Sector                |subsector|subsector_name                                                          |count|
+----------------------+---------+------------------------------------------------------------------------+-----+
|Consumer Non-Cyclicals|11       |Agriculture, Forestry, Fishing and Hunting                              |6    |
|Energy                |21       |Mining, Quarrying, and Oil and Gas Extraction                           |6    |
|Utilities             |22       |Utilities                                                               |6    |
|Industrials           |23       |Construction                                                            |6    |
|Industrials           |31       |Manufacturing                                                           |6    |
|Industrials           |42       |Wholesale Trade                                       

                                                                                

In [12]:
# calculate average AI acceptance rate across the same sector
btos_survey_fnlfnl = (btos_survey_fnl
                      .filter(col('Answer')=="Yes")
                      .groupBy('Sector','Question ID')
                      .agg(*[F.avg(c).alias(f"avg_{c.replace(' ', '_')}") for c in cols_to_clean])
                      .sort('Sector','Question ID')
                      )

In [13]:
# sample print
btos_survey_fnlfnl.show()



+--------------------+-----------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+----------+------------------+----------+----------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+
|      

                                                                                

In [14]:
# check schema
btos_survey_fnlfnl.printSchema()

root
 |-- Sector: string (nullable = true)
 |-- Question ID: double (nullable = true)
 |-- avg_202319: double (nullable = true)
 |-- avg_202320: double (nullable = true)
 |-- avg_202321: double (nullable = true)
 |-- avg_202322: double (nullable = true)
 |-- avg_202323: double (nullable = true)
 |-- avg_202324: double (nullable = true)
 |-- avg_202325: double (nullable = true)
 |-- avg_202326: double (nullable = true)
 |-- avg_202401: double (nullable = true)
 |-- avg_202402: double (nullable = true)
 |-- avg_202403: double (nullable = true)
 |-- avg_202404: double (nullable = true)
 |-- avg_202405: double (nullable = true)
 |-- avg_202406: double (nullable = true)
 |-- avg_202407: double (nullable = true)
 |-- avg_202408: double (nullable = true)
 |-- avg_202409: double (nullable = true)
 |-- avg_202410: double (nullable = true)
 |-- avg_202411: double (nullable = true)
 |-- avg_202412: double (nullable = true)
 |-- avg_202413: double (nullable = true)
 |-- avg_202414: double (nullabl

<h4>Step 2: processing S&P Factset data

**Load S&P factset data**

In [15]:
SP500_factset = pd.read_excel(data_path + "S+P500_Factset.xlsx", engine="openpyxl")
SP500_factset = spark.createDataFrame(SP500_factset)

In [16]:
# sample print
SP500_factset.show()

+------+--------------------+--------------+--------------------+-------------+----------------+---------+----------+-------------------------+----------------------+------------------+-----------------------+---------------+--------------------+------------------------+--------------------------+-----------------------+--------------------------+
|Symbol|                Name|Stock Exchange|       RBICS Economy|Closing Price|    Market Value|    Sales|Unnamed: 7|1 Year Growth Total Sales|FE Sales Mean FY1 Roll|EBITDA Oper Income|FE Ebitda Mean FY1 Roll|Earns Per Share|FE Eps Mean FY1 Roll|FE Val Roe Mean FY1 Roll|Return on Avg Total Assets|FE Val Pe Mean FY1 Roll|Resrch and Develop Expense|
+------+--------------------+--------------+--------------------+-------------+----------------+---------+----------+-------------------------+----------------------+------------------+-----------------------+---------------+--------------------+------------------------+--------------------------+--

In [17]:
# check schema
SP500_factset.printSchema

<bound method DataFrame.printSchema of DataFrame[Symbol: string, Name: string, Stock Exchange: string, RBICS Economy: string, Closing Price: double, Market Value: double, Sales: double, Unnamed: 7: double, 1 Year Growth Total Sales: double, FE Sales Mean FY1 Roll: double, EBITDA Oper Income: double, FE Ebitda Mean FY1 Roll: double, Earns Per Share: double, FE Eps Mean FY1 Roll: double, FE Val Roe Mean FY1 Roll: double, Return on Avg Total Assets: double, FE Val Pe Mean FY1 Roll: double, Resrch and Develop Expense: double]>

In [18]:
# check sector classification
SP500_factset.groupBy('RBICS Economy').count().sort('RBICS Economy').show()

+--------------------+-----+
|       RBICS Economy|count|
+--------------------+-----+
|   Business Services|   15|
|  Consumer Cyclicals|   25|
|Consumer Non-Cycl...|   43|
|   Consumer Services|   27|
|              Energy|   21|
|             Finance|  102|
|          Healthcare|   59|
|         Industrials|   66|
|Non-Energy Materials|   31|
|          Technology|   75|
|  Telecommunications|    5|
|           Utilities|   31|
+--------------------+-----+



In [19]:
# define the columns that need to be cleaned
fin_data_cols = ['Closing Price',
                 'Market Value',
                 'Sales',
                 'Unnamed: 7', ## what's this?
                 '1 Year Growth Total Sales',
                 'FE Sales Mean FY1 Roll',
                 'EBITDA Oper Income',
                 'FE Ebitda Mean FY1 Roll',
                 'Earns Per Share',
                 'FE Eps Mean FY1 Roll',
                 'FE Val Roe Mean FY1 Roll',
                 'Return on Avg Total Assets',
                 'FE Val Pe Mean FY1 Roll',
                 'Resrch and Develop Expense'
                 ]

In [20]:
# clean the columns as they have NaN rather than nulls
SP500_factset_preprocess = SP500_factset

for c in fin_data_cols:
    if c in SP500_factset_preprocess.columns:
        SP500_factset_preprocess = SP500_factset_preprocess.withColumn(c, F.when(F.isnan(F.col(c)), None).otherwise(F.col(c)))

In [21]:
# group by sector and get averages
sector_SNP = (SP500_factset_preprocess
                .withColumn("RBICS Economy", F.trim(F.col("RBICS Economy")))
                .groupBy('RBICS Economy')
                .agg(*[F.avg(F.col(c).cast("double")).alias(f"avg_{c.replace(' ', '_')}")
                       for c in fin_data_cols])
                .withColumnRenamed('RBICS Economy', 'Sector')
                .sort('Sector')
                )

# sample print
sector_SNP.show()

+--------------------+------------------+------------------+------------------+------------------+-----------------------------+--------------------------+----------------------+---------------------------+-------------------+------------------------+----------------------------+------------------------------+---------------------------+------------------------------+
|              Sector| avg_Closing_Price|  avg_Market_Value|         avg_Sales|    avg_Unnamed:_7|avg_1_Year_Growth_Total_Sales|avg_FE_Sales_Mean_FY1_Roll|avg_EBITDA_Oper_Income|avg_FE_Ebitda_Mean_FY1_Roll|avg_Earns_Per_Share|avg_FE_Eps_Mean_FY1_Roll|avg_FE_Val_Roe_Mean_FY1_Roll|avg_Return_on_Avg_Total_Assets|avg_FE_Val_Pe_Mean_FY1_Roll|avg_Resrch_and_Develop_Expense|
+--------------------+------------------+------------------+------------------+------------------+-----------------------------+--------------------------+----------------------+---------------------------+-------------------+------------------------+-------

<h4>Step 3: get one table with AI usage data and financial data combined

In [22]:
## join the 2 tables:
joined_table = (btos_survey_fnlfnl
                .join(sector_SNP,["Sector"],"left")
                .sort('Sector','Question ID')
                )

# sample print
joined_table.show()

                                                                                

+--------------------+-----------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+----------+------------------+----------+----------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+--------

In [23]:
# one table with question 7:
# In the last two weeks, did this business use Artificial Intelligence (AI) in producing goods or services? 
# (Examples of AI: machine learning, natural language processing, virtual agents, voice recognition, etc.)

AI_usage_wth_fin_data = (joined_table
                         .filter(col('Question ID')==7))

AI_usage_wth_fin_data.show(5)




+--------------------+-----------+----------+-----------------+-----------------+------------------+----------+------------------+----------+----------+-----------------+----------+----------+------------------+----------+----------+-----------------+-----------------+----------+------------------+----------+-----------------+----------+----------+----------+------------------+----------+------------------+----------+------------------+------------------+----------+------------------+------------------+----------+------------------+------------------+----------+----------+------------------+------------------+----------+----------+------------------+----------+------------------+------------------+------------------+------------------+-----------------+----------+----------+----------+------------------+------------------+------------------+------------------+------------------+-----------------------------+--------------------------+----------------------+---------------------------+-

                                                                                

In [24]:
# one table with question 24:
# During the next six months, do you think this business will be using Artificial Intelligence (AI) in producing goods or services? 
# (Examples of AI: machine learning, natural language processing, virtual agents, voice recognition,

AI_acceptance_wth_fin_data = (joined_table
                              .filter(col('Question ID')==24))

AI_acceptance_wth_fin_data.show(5)

[Stage 35:>                                                         (0 + 8) / 8]

+--------------------+-----------+----------+-----------------+----------+------------------+----------+------------------+-----------------+----------+-----------------+----------+----------+------------------+----------+----------+-----------------+-----------------+------------------+------------------+----------+------------------+-----------------+-----------------+----------+------------------+----------+------------------+----------+------------------+-----------------+------------------+----------+------------------+----------+----------+----------+------------------+-----------------+----------+------------------+-----------------+------------------+----------+----------+----------+------------------+------------------+-----------------+----------+------------------+-----------------+----------+----------+------------------+------------------+------------------+------------------+-----------------------------+--------------------------+----------------------+------------------

                                                                                

<h4>Step 4: save the tables

In [25]:
AI_usage_wth_fin_data_pd = AI_usage_wth_fin_data.toPandas()
AI_acceptance_wth_fin_data_pd = AI_acceptance_wth_fin_data.toPandas()

AI_usage_wth_fin_data_pd.to_excel(data_path + "Saved_Tables/01_AI_usage_wth_fin_data.xlsx", index=False)
AI_acceptance_wth_fin_data_pd.to_excel(data_path + "Saved_Tables/01_AI_acceptance_wth_fin_data.xlsx", index=False)