## Data Preprocessing

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
spark

In [54]:
df = spark.read.parquet('s3://sec-finc/pivot_data_vpq/')

In [5]:
df.count(), len(df.columns)

(477717, 45)

drop two columns filled with null values

In [8]:
df = df.drop('segment_label')
df = df.drop('segment_hash')

In [9]:
from pyspark.sql.functions import isnan, count, when, col, isnull

In [10]:
col = ['cik',
 'company_name',
 'assigned_sic',
 'accession_number_int',
 'document_fiscal_year_focus',
 'datapoint_label',
 'statement_type',
 'CostOfGoodsAndServicesSold',
 'CostOfGoodsSold',
 'CostOfServices',
 'EarningsPerShareBasic',
 'EarningsPerShareDiluted',
 'GainLossOnDispositionOfAssets',
 'GeneralAndAdministrativeExpense',
 'IncomeTaxesPaid',
 'IncreaseDecreaseInAccountsPayable',
 'IncreaseDecreaseInAccountsReceivable',
 'IncreaseDecreaseInAccruedLiabilities',
 'IncreaseDecreaseInInventories',
 'LaborAndRelatedExpense',
 'NetCashProvidedByUsedInFinancingActivities',
 'NetCashProvidedByUsedInFinancingActivitiesContinuingOperations',
 'NetCashProvidedByUsedInInvestingActivities',
 'NetCashProvidedByUsedInInvestingActivitiesContinuingOperations',
 'NetCashProvidedByUsedInOperatingActivities',
 'NetCashProvidedByUsedInOperatingActivitiesContinuingOperations',
 'NetIncomeLoss',
 'OperatingExpenses',
 'OperatingIncomeLoss',
 'PaymentsForRepurchaseOfCommonStock',
 'PaymentsOfDividends',
 'PaymentsOfDividendsCommonStock',
 'PaymentsOfFinancingCosts',
 'PaymentsToAcquireBusinessesNetOfCashAcquired',
 'PaymentsToAcquirePropertyPlantAndEquipment',
 'RepaymentsOfLongTermDebt',
 'ResearchAndDevelopmentExpense',
 'Revenues',
 'SellingAndMarketingExpense',
 'ShareBasedCompensation']

show number of nulls in each columns

In [11]:
df.select([count(when(isnull(c), c)).alias(c) for c in col]).show()

+---+------------+------------+--------------------+--------------------------+---------------+--------------+--------------------------+---------------+--------------+---------------------+-----------------------+-----------------------------+-------------------------------+---------------+---------------------------------+------------------------------------+------------------------------------+-----------------------------+----------------------+------------------------------------------+--------------------------------------------------------------+------------------------------------------+--------------------------------------------------------------+------------------------------------------+--------------------------------------------------------------+-------------+-----------------+-------------------+----------------------------------+-------------------+------------------------------+------------------------+--------------------------------------------+---------------------

print datatype of each column

In [12]:
for c in df.columns:
    print(c, df.schema[c].dataType)

cik IntegerType
company_name StringType
assigned_sic IntegerType
accession_number_int LongType
filing_date DateType
document_fiscal_year_focus IntegerType
datapoint_label StringType
start_date DateType
end_date DateType
statement_type StringType
CostOfGoodsAndServicesSold DoubleType
CostOfGoodsSold DoubleType
CostOfServices DoubleType
EarningsPerShareBasic DoubleType
EarningsPerShareDiluted DoubleType
GainLossOnDispositionOfAssets DoubleType
GeneralAndAdministrativeExpense DoubleType
IncomeTaxesPaid DoubleType
IncreaseDecreaseInAccountsPayable DoubleType
IncreaseDecreaseInAccountsReceivable DoubleType
IncreaseDecreaseInAccruedLiabilities DoubleType
IncreaseDecreaseInInventories DoubleType
LaborAndRelatedExpense DoubleType
NetCashProvidedByUsedInFinancingActivities DoubleType
NetCashProvidedByUsedInFinancingActivitiesContinuingOperations DoubleType
NetCashProvidedByUsedInInvestingActivities DoubleType
NetCashProvidedByUsedInInvestingActivitiesContinuingOperations DoubleType
NetCashProvi

In [13]:
from pyspark import sql

In [14]:
df.createOrReplaceTempView("dfTable")

create a table df_merged, in which each row is the financial data of one company in one fiscal year

In [15]:
df_merged = spark.sql("select company_name, document_fiscal_year_focus, max(cik) as cik,\
                  max(assigned_sic) as assigned_sic, max(accession_number_int) as accession_number_int, \
                  max(filing_date) as filing_date, max(CostOfGoodsAndServicesSold) as CostOfGoodsAndServicesSold, \
                  max(CostOfGoodsSold) as CostOfGoodsSold, max(CostOfServices) as CostOfServices, max(EarningsPerShareBasic) as EarningsPerShareBasic, \
                  max(EarningsPerShareDiluted) as EarningsPerShareDiluted, max(GainLossOnDispositionOfAssets) as GainLossOnDispositionOfAssets, \
                  max(GeneralAndAdministrativeExpense) as GeneralAndAdministrativeExpense, max(IncomeTaxesPaid) as IncomeTaxesPaid, \
                  max(IncreaseDecreaseInAccountsPayable) as IncreaseDecreaseInAccountsPayable, max(IncreaseDecreaseInAccountsReceivable) as IncreaseDecreaseInAccountsReceivable, \
                  max(IncreaseDecreaseInAccruedLiabilities) as IncreaseDecreaseInAccruedLiabilities, max(IncreaseDecreaseInInventories) as IncreaseDecreaseInInventories, \
                  max(LaborAndRelatedExpense) as LaborAndRelatedExpense, max(NetCashProvidedByUsedInFinancingActivities) as NetCashProvidedByUsedInFinancingActivities, \
                  max(NetCashProvidedByUsedInFinancingActivitiesContinuingOperations) as NetCashProvidedByUsedInFinancingActivitiesContinuingOperations, \
                  max(NetCashProvidedByUsedInInvestingActivities) as NetCashProvidedByUsedInInvestingActivities, \
                  max(NetCashProvidedByUsedInInvestingActivitiesContinuingOperations) as NetCashProvidedByUsedInInvestingActivitiesContinuingOperations, \
                  max(NetCashProvidedByUsedInOperatingActivities) as NetCashProvidedByUsedInOperatingActivities, \
                  max(NetCashProvidedByUsedInOperatingActivitiesContinuingOperations) as NetCashProvidedByUsedInOperatingActivitiesContinuingOperations, max(NetIncomeLoss) as NetIncomeLoss, \
                  max(OperatingExpenses) as OperatingExpenses, max(OperatingIncomeLoss) as OperatingIncomeLoss, max(PaymentsForRepurchaseOfCommonStock) as PaymentsForRepurchaseOfCommonStock, \
                  max(PaymentsOfDividends) as PaymentsOfDividends, max(PaymentsOfDividendsCommonStock) as PaymentsOfDividendsCommonStock, \
                  max(PaymentsOfFinancingCosts) as PaymentsOfFinancingCosts, max(PaymentsToAcquireBusinessesNetOfCashAcquired) as PaymentsToAcquireBusinessesNetOfCashAcquired, \
                  max(PaymentsToAcquirePropertyPlantAndEquipment) as PaymentsToAcquirePropertyPlantAndEquipment, max(RepaymentsOfLongTermDebt) as RepaymentsOfLongTermDebt, \
                  max(ResearchAndDevelopmentExpense) as ResearchAndDevelopmentExpense, max(Revenues) as Revenues, max(SellingAndMarketingExpense) as SellingAndMarketingExpense, \
                  max(ShareBasedCompensation) as ShareBasedCompensation\
                  from dfTable group by company_name, document_fiscal_year_focus")

create another table df_temp which is identical to df_merged table but has one more column "profit year"

In [16]:
df_temp = df_merged.withColumn('profit_year', df_merged['document_fiscal_year_focus']+1)

In [17]:
df_merged.createOrReplaceTempView("dfMergedTable")
df_temp.createOrReplaceTempView("dfTempTable")

join df_temp and df_merged together on condition company names are same and fiscal year and profit year are same

In [28]:
df_temp1 = spark.sql("select b.cik, b.company_name, b.assigned_sic, b.accession_number_int, b.filing_date, b.CostOfGoodsAndServicesSold, b.CostOfGoodsSold, b.CostOfServices, b.EarningsPerShareBasic, b.EarningsPerShareDiluted, b.GainLossOnDispositionOfAssets, b.GeneralAndAdministrativeExpense, b.IncomeTaxesPaid, b.IncreaseDecreaseInAccountsPayable, b.IncreaseDecreaseInAccountsReceivable, b.IncreaseDecreaseInAccruedLiabilities, b.IncreaseDecreaseInInventories, b.LaborAndRelatedExpense, b.NetCashProvidedByUsedInFinancingActivities, b.NetCashProvidedByUsedInFinancingActivitiesContinuingOperations, b.NetCashProvidedByUsedInInvestingActivities, b.NetCashProvidedByUsedInInvestingActivitiesContinuingOperations, b.NetCashProvidedByUsedInOperatingActivities, b.NetCashProvidedByUsedInOperatingActivitiesContinuingOperations, b.NetIncomeLoss, b.OperatingExpenses, b.OperatingIncomeLoss, b.PaymentsForRepurchaseOfCommonStock, b.PaymentsOfDividends, b.PaymentsOfDividendsCommonStock, b.PaymentsOfFinancingCosts, b.PaymentsToAcquireBusinessesNetOfCashAcquired, b.PaymentsToAcquirePropertyPlantAndEquipment, b.RepaymentsOfLongTermDebt, b.ResearchAndDevelopmentExpense, b.Revenues, b.SellingAndMarketingExpense, b.ShareBasedCompensation, \
                     b.document_fiscal_year_focus as year, b.profit_year, a.OperatingIncomeLoss as profit \
                     from dfMergedTable a inner join dfTempTable b \
                     on a.document_fiscal_year_focus == b.profit_year and a.company_name == b.company_name")

In [29]:
df_temp1.show(5, False)

+-------+-------------------------------------+------------+--------------------+-----------+--------------------------+---------------+--------------+---------------------+-----------------------+-----------------------------+-------------------------------+---------------+---------------------------------+------------------------------------+------------------------------------+-----------------------------+----------------------+------------------------------------------+--------------------------------------------------------------+------------------------------------------+--------------------------------------------------------------+------------------------------------------+--------------------------------------------------------------+-------------+-----------------+-------------------+----------------------------------+-------------------+------------------------------+------------------------+--------------------------------------------+--------------------------------------

In [30]:
df_temp1.count()

31916

In [46]:
df_temp1.write.parquet("s3://502-project-1/data",mode="overwrite")