In [None]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
import scala.util.{Failure, Success, Try}
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession

import java.time.LocalDateTime
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit.DAYS

In [None]:
val appName = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))

val spark = (
    SparkSession
    .builder()
    .appName(appName)
    .config("spark.cores.max", 8)
    .config("spark.executor.cores", 8)
    .config("spark.executor.memory", "36g")
    .getOrCreate()
    )

In [None]:
val mongoUrl = "mongodb+srv://xxxxxxxxxxxxxxxxxxxxxx/"

In [None]:
val df = (
    spark.read.format("mongodb")
    .option("spark.mongodb.read.connection.uri", mongoUrl)
    .option("spark.mongodb.write.connection.uri", mongoUrl)
    .option("database", "coreEngine")
    .option("collection", "ReportCleansing")
    .load()
    .drop("_id")
    )

In [None]:
val datePeriod = udf((x: String, y: String) => {
    val dateFormat = DateTimeFormatter.ofPattern("yyyyMMdd")
    DAYS.between(LocalDate.parse(x, dateFormat), LocalDate.parse(y, dateFormat)).toDouble / 365.0
})

val reprtCodeToQuarter = udf((x: String) => {
    x match {
        case "11011" => "4"
        case "11013" => "1"
        case "11012" => "2"
        case "11014" => "3"
        case _ => ""
    }
})

val dateToQuarter = udf((x: String) => {
    import scala.math._
    ceil(x.toDouble / 3.0).toInt.toString
})

val partition = Window.partitionBy("stockCode").orderBy("bsnsYear", "rceptNo")
val dsr_partition = Window.partitionBy("bsnsYear", "rceptNo").orderBy("bsnsYear", "rceptNo")
val ff = partition.rowsBetween(Window.unboundedPreceding, 0)
val w_5 = partition.rowsBetween(-5, 0)

In [None]:
val cfs = (
    df
    .where(col("fsDiv") === "CFS")
    .select(
        "accountId", "thstrmAmount", "rceptNo", "reprtCode", "bsnsYear", "corpCode", 
        "stockName", "stockCode", "fsDiv"
        )
    .groupBy(
        "rceptNo", "reprtCode", "bsnsYear", "corpCode", 
        "stockName", "stockCode", "fsDiv"
        )
    .pivot("accountId").agg(sum("thstrmAmount"))
    .drop("fsDiv")
    .withColumn("updateDate", substring(col("rceptNo"), 1, 8))
    .withColumn("quarter", reprtCodeToQuarter(col("reprtCode")))
    .orderBy(col("rceptNo"))
    .withColumnRenamed("AccumulatedDepreciation", "CFS_AccumulatedDepreciation")
    .withColumnRenamed("AdjustmentsForInterestExpenses", "CFS_AdjustmentsForInterestExpenses")
    .withColumnRenamed("AllowanceForDoubtfulAcccount", "CFS_AllowanceForDoubtfulAcccount")
    .withColumnRenamed("Assets", "CFS_Assets")
    .withColumnRenamed("CIS_CostOfSales", "CFS_CIS_CostOfSales")
    .withColumnRenamed("CIS_OperatingIncomeLoss", "CFS_CIS_OperatingIncomeLoss")
    .withColumnRenamed("CIS_ProfitLoss", "CFS_CIS_ProfitLoss")
    .withColumnRenamed("CIS_ProfitLossBeforeTax", "CFS_CIS_ProfitLossBeforeTax")
    .withColumnRenamed("CIS_Revenue", "CFS_CIS_Revenue")
    .withColumnRenamed("CashAndCashEquivalents", "CFS_CashAndCashEquivalents")
    .withColumnRenamed("CashFlowsFromUsedInOperatingActivities", "CFS_CashFlowsFromUsedInOperatingActivities")
    .withColumnRenamed("CostOfSales", "CFS_CostOfSales")
    .withColumnRenamed("CurrentAssets", "CFS_CurrentAssets")
    .withColumnRenamed("CurrentLiabilities", "CFS_CurrentLiabilities")
    .withColumnRenamed("Equity", "CFS_Equity")
    .withColumnRenamed("IntangibleAssetsOtherThanGoodwill", "CFS_IntangibleAssetsOtherThanGoodwill")
    .withColumnRenamed("InterestPaid", "CFS_InterestPaid")
    .withColumnRenamed("Inventories", "CFS_Inventories")
    .withColumnRenamed("IssuedCapital", "CFS_IssuedCapital")
    .withColumnRenamed("Liabilities", "CFS_Liabilities")
    .withColumnRenamed("NoncontrollingInterests", "CFS_NoncontrollingInterests")
    .withColumnRenamed("NoncurrentAssets", "CFS_NoncurrentAssets")
    .withColumnRenamed("OperatingIncomeLoss", "CFS_OperatingIncomeLoss")
    .withColumnRenamed("OtherComprehensiveIncome", "CFS_OtherComprehensiveIncome")
    .withColumnRenamed("ProfitLoss", "CFS_ProfitLoss")
    .withColumnRenamed("ProfitLossBeforeTax", "CFS_ProfitLossBeforeTax")
    .withColumnRenamed("PropertyPlantAndEquipment", "CFS_PropertyPlantAndEquipment")
    .withColumnRenamed("RetainedEarnings", "CFS_RetainedEarnings")
    .withColumnRenamed("Revenue", "CFS_Revenue")
    .withColumnRenamed("ShortTermTradePayables", "CFS_ShortTermTradePayables")
    .withColumnRenamed("ShortTermTradeReceivable", "CFS_ShortTermTradeReceivable")
    ).cache()

val ofs = (
    df
    .where(col("fsDiv") === "OFS")
    .select(
        "accountId", "thstrmAmount", "rceptNo", "reprtCode", "bsnsYear", "corpCode", 
        "stockName", "stockCode", "fsDiv"
        )
    .groupBy(
        "rceptNo", "reprtCode", "bsnsYear", "corpCode", 
        "stockName", "stockCode", "fsDiv"
        )
    .pivot("accountId").agg(sum("thstrmAmount"))
    .drop("fsDiv")
    .withColumn("updateDate", substring(col("rceptNo"), 1, 8))
    .withColumn("quarter", reprtCodeToQuarter(col("reprtCode")))
    .orderBy(col("rceptNo"))
    .withColumnRenamed("AccumulatedDepreciation", "OFS_AccumulatedDepreciation")
    .withColumnRenamed("AdjustmentsForInterestExpenses", "OFS_AdjustmentsForInterestExpenses")
    .withColumnRenamed("AllowanceForDoubtfulAcccount", "OFS_AllowanceForDoubtfulAcccount")
    .withColumnRenamed("Assets", "OFS_Assets")
    .withColumnRenamed("CIS_CostOfSales", "OFS_CIS_CostOfSales")
    .withColumnRenamed("CIS_OperatingIncomeLoss", "OFS_CIS_OperatingIncomeLoss")
    .withColumnRenamed("CIS_ProfitLoss", "OFS_CIS_ProfitLoss")
    .withColumnRenamed("CIS_ProfitLossBeforeTax", "OFS_CIS_ProfitLossBeforeTax")
    .withColumnRenamed("CIS_Revenue", "OFS_CIS_Revenue")
    .withColumnRenamed("CashAndCashEquivalents", "OFS_CashAndCashEquivalents")
    .withColumnRenamed("CashFlowsFromUsedInOperatingActivities", "OFS_CashFlowsFromUsedInOperatingActivities")
    .withColumnRenamed("CostOfSales", "OFS_CostOfSales")
    .withColumnRenamed("CurrentAssets", "OFS_CurrentAssets")
    .withColumnRenamed("CurrentLiabilities", "OFS_CurrentLiabilities")
    .withColumnRenamed("Equity", "OFS_Equity")
    .withColumnRenamed("IntangibleAssetsOtherThanGoodwill", "OFS_IntangibleAssetsOtherThanGoodwill")
    .withColumnRenamed("InterestPaid", "OFS_InterestPaid")
    .withColumnRenamed("Inventories", "OFS_Inventories")
    .withColumnRenamed("IssuedCapital", "OFS_IssuedCapital")
    .withColumnRenamed("Liabilities", "OFS_Liabilities")
    .withColumnRenamed("NoncontrollingInterests", "OFS_NoncontrollingInterests")
    .withColumnRenamed("NoncurrentAssets", "OFS_NoncurrentAssets")
    .withColumnRenamed("OperatingIncomeLoss", "OFS_OperatingIncomeLoss")
    .withColumnRenamed("OtherComprehensiveIncome", "OFS_OtherComprehensiveIncome")
    .withColumnRenamed("ProfitLoss", "OFS_ProfitLoss")
    .withColumnRenamed("ProfitLossBeforeTax", "OFS_ProfitLossBeforeTax")
    .withColumnRenamed("PropertyPlantAndEquipment", "OFS_PropertyPlantAndEquipment")
    .withColumnRenamed("RetainedEarnings", "OFS_RetainedEarnings")
    .withColumnRenamed("Revenue", "OFS_Revenue")
    .withColumnRenamed("ShortTermTradePayables", "OFS_ShortTermTradePayables")
    .withColumnRenamed("ShortTermTradeReceivable", "OFS_ShortTermTradeReceivable")
    ).cache()

In [None]:
val temp_df = (
    cfs
    .join(ofs, Seq("rceptNo", "reprtCode", "bsnsYear", "corpCode", "stockName", "stockCode", "updateDate", "quarter"), "left")
    )

In [None]:
val reportPrepare = udf((cfs: Long, ofs: Long) => {
    def checkValue[T](v: T): Option[Long] = v match {
        case i: Long => Some(i)
        case _ => None
    }
    val v = (checkValue(cfs), checkValue(ofs))

    v match {
        case (None, _) => v._2.get
        case (_, None) => v._1.get
        case (_, _) => {
            v match {
                case (_, Some(0)) => v._1.get
                case (Some(0), _) => v._2.get
                case (_, _) => v._2.get.max(v._1.get)
            }
        }
    }
})

In [None]:
val prepare_df = (
    temp_df
    .withColumn("AccumulatedDepreciation", reportPrepare(col("CFS_AccumulatedDepreciation"), col("OFS_AccumulatedDepreciation")))
    .withColumn("AdjustmentsForInterestExpenses", reportPrepare(col("CFS_AdjustmentsForInterestExpenses"), col("OFS_AdjustmentsForInterestExpenses")))
    .withColumn("AllowanceForDoubtfulAcccount", reportPrepare(col("CFS_AllowanceForDoubtfulAcccount"), col("OFS_AllowanceForDoubtfulAcccount")))
    .withColumn("Assets", reportPrepare(col("CFS_Assets"), col("OFS_Assets")))
    .withColumn("CIS_CostOfSales", reportPrepare(col("CFS_CIS_CostOfSales"), col("OFS_CIS_CostOfSales")))
    .withColumn("CIS_OperatingIncomeLoss", reportPrepare(col("CFS_CIS_OperatingIncomeLoss"), col("OFS_CIS_OperatingIncomeLoss")))
    .withColumn("CIS_ProfitLoss", reportPrepare(col("CFS_CIS_ProfitLoss"), col("OFS_CIS_ProfitLoss")))
    .withColumn("CIS_ProfitLossBeforeTax", reportPrepare(col("CFS_CIS_ProfitLossBeforeTax"), col("OFS_CIS_ProfitLossBeforeTax")))
    .withColumn("CIS_Revenue", reportPrepare(col("CFS_CIS_Revenue"), col("OFS_CIS_Revenue")))
    .withColumn("CashAndCashEquivalents", reportPrepare(col("CFS_CashAndCashEquivalents"), col("OFS_CashAndCashEquivalents")))
    .withColumn("CashFlowsFromUsedInOperatingActivities", reportPrepare(col("CFS_CashFlowsFromUsedInOperatingActivities"), col("OFS_CashFlowsFromUsedInOperatingActivities")))
    .withColumn("CostOfSales", reportPrepare(col("CFS_CostOfSales"), col("OFS_CostOfSales")))
    .withColumn("CurrentAssets", reportPrepare(col("CFS_CurrentAssets"), col("OFS_CurrentAssets")))
    .withColumn("CurrentLiabilities", reportPrepare(col("CFS_CurrentLiabilities"), col("OFS_CurrentLiabilities")))
    .withColumn("Equity", reportPrepare(col("CFS_Equity"), col("OFS_Equity")))
    .withColumn("IntangibleAssetsOtherThanGoodwill", reportPrepare(col("CFS_IntangibleAssetsOtherThanGoodwill"), col("OFS_IntangibleAssetsOtherThanGoodwill")))
    .withColumn("InterestPaid", reportPrepare(col("CFS_InterestPaid"), col("OFS_InterestPaid")))
    .withColumn("Inventories", reportPrepare(col("CFS_Inventories"), col("OFS_Inventories")))
    .withColumn("IssuedCapital", reportPrepare(col("CFS_IssuedCapital"), col("OFS_IssuedCapital")))
    .withColumn("Liabilities", reportPrepare(col("CFS_Liabilities"), col("OFS_Liabilities")))
    .withColumn("NoncontrollingInterests", reportPrepare(col("CFS_NoncontrollingInterests"), col("OFS_NoncontrollingInterests")))
    .withColumn("NoncurrentAssets", reportPrepare(col("CFS_NoncurrentAssets"), col("OFS_NoncurrentAssets")))
    .withColumn("OperatingIncomeLoss", reportPrepare(col("CFS_OperatingIncomeLoss"), col("OFS_OperatingIncomeLoss")))
    .withColumn("OtherComprehensiveIncome", reportPrepare(col("CFS_OtherComprehensiveIncome"), col("OFS_OtherComprehensiveIncome")))
    .withColumn("ProfitLoss", reportPrepare(col("CFS_ProfitLoss"), col("OFS_ProfitLoss")))
    .withColumn("ProfitLossBeforeTax", reportPrepare(col("CFS_ProfitLossBeforeTax"), col("OFS_ProfitLossBeforeTax")))
    .withColumn("PropertyPlantAndEquipment", reportPrepare(col("CFS_PropertyPlantAndEquipment"), col("OFS_PropertyPlantAndEquipment")))
    .withColumn("RetainedEarnings", reportPrepare(col("CFS_RetainedEarnings"), col("OFS_RetainedEarnings")))
    .withColumn("Revenue", reportPrepare(col("CFS_Revenue"), col("OFS_Revenue")))
    .withColumn("ShortTermTradePayables", reportPrepare(col("CFS_ShortTermTradePayables"), col("OFS_ShortTermTradePayables")))
    .withColumn("ShortTermTradeReceivable", reportPrepare(col("CFS_ShortTermTradeReceivable"), col("OFS_ShortTermTradeReceivable")))
    )

In [None]:
val disclosure = (
    spark.read.format("mongodb")
    .option("spark.mongodb.read.connection.uri", mongoUrl)
    .option("spark.mongodb.write.connection.uri", mongoUrl)
    .option("database", "coreEngine")
    .option("collection", "Disclosure")
    .load()
    .drop("_id")
    .select("disclosureTime", "reason", "stockCode")
    .where(!regexp_replace(col("reason"), " ", "").like("%해소%"))
    .where(!regexp_replace(col("reason"), " ", "").like("%해제%"))
    .withColumn("disclosureTime", substring(regexp_replace(col("disclosureTime"), "-", ""), 1, 8))
    .withColumn("bsnsYear", substring(col("disclosureTime"), 1, 4))
    .withColumn("quarter", dateToQuarter(substring(col("disclosureTime"), 5, 2)))
    .withColumn("event", lit(1))
    .drop("reason", "disclosureTime")
    distinct()
    )

In [None]:
val company = (
    spark.read.format("mongodb")
    .option("spark.mongodb.read.connection.uri", mongoUrl)
    .option("spark.mongodb.write.connection.uri", mongoUrl)
    .option("database", "coreEngine")
    .option("collection", "CompanyInformation")
    .load()
    .drop("_id")
    .select("stockCode", "estDt", "corpCls")
    ).cache()

val price = (
    spark.read.format("mongodb")
    .option("spark.mongodb.read.connection.uri", mongoUrl)
    .option("spark.mongodb.write.connection.uri", mongoUrl)
    .option("database", "coreEngine")
    .option("collection", "Price")
    .load()
    .drop("_id")
    .select("stockCode", "marketCap", "updateDate")
    ).cache()

In [None]:
val fs = (
    prepare_df
    .join(price, Seq("stockCode", "updateDate"), "left")
    .join(company, Seq("stockCode"), "left")
    .join(disclosure, Seq("stockCode", "bsnsYear", "quarter"), "left")
    ).cache()

In [None]:
val fs_preprocess = (
    fs
    .withColumn("Revenue", when(col("Revenue").isNull, col("CIS_Revenue")).otherwise(col("Revenue")))
    .withColumn("CostOfSales", when(col("CostOfSales").isNull, col("CIS_CostOfSales")).otherwise(col("CostOfSales")))
    .withColumn("OperatingIncomeLoss", when(col("OperatingIncomeLoss").isNull, col("CIS_OperatingIncomeLoss")).otherwise(col("OperatingIncomeLoss")))
    .withColumn("ProfitLoss", when(col("ProfitLoss").isNull, col("CIS_ProfitLoss")).otherwise(col("ProfitLoss")))
    .withColumn("ProfitLossBeforeTax", when(col("ProfitLossBeforeTax").isNull, col("CIS_ProfitLossBeforeTax")).otherwise(col("ProfitLossBeforeTax")))
    .drop("CIS_Revenue", "CIS_CostOfSales", "CIS_OperatingIncomeLoss", "CIS_ProfitLoss", "CIS_ProfitLossBeforeTax")
    .na.fill(0)
    
    // CashFlowsFromUsedInOperatingActivities
    // 영업활동현금흐름 재정의
    .withColumn(
        "CashFlowsFromUsedInOperatingActivities",
        when(
            col("reprtCode") === "11013",
            col("CashFlowsFromUsedInOperatingActivities")
            )
        .otherwise(
            col("CashFlowsFromUsedInOperatingActivities") - 
            lag(
                col("CashFlowsFromUsedInOperatingActivities"), offset=1
               )
            .over(partition))
        )

    // CostOfSales
    // 매출원가 재정의
    .withColumn(
        "CostOfSales",
        when(
            col("reprtCode") === "11011", col("CostOfSales") - 
            lag(col("CostOfSales"), offset=1).over(partition) - 
            lag(col("CostOfSales"), offset=2).over(partition) - 
            lag(col("CostOfSales"), offset=3).over(partition)
            ).otherwise(col("CostOfSales"))
        )

    // InterestPaid
    // 이자지급 재정의
    .withColumn(
        "InterestPaid", 
        when(col("InterestPaid").isNull, 0).otherwise(col("InterestPaid"))
    )
    .withColumn(
        "InterestPaid",
        when(
            col("reprtCode") !== "11013",
            lag(col("InterestPaid"), offset=1).over(partition) -
            col("InterestPaid")
            ).otherwise(col("InterestPaid") * -1)
    )

    // OperatingIncomeLoss
    // 영업이익 재정의
    .withColumn(
        "OperatingIncomeLoss",
        when(
            col("reprtCode") === "11011",
            col("OperatingIncomeLoss") -
            lag(col("OperatingIncomeLoss"), offset=1).over(partition) -
            lag(col("OperatingIncomeLoss"), offset=2).over(partition) -
            lag(col("OperatingIncomeLoss"), offset=3).over(partition)
            )
            .otherwise(col("OperatingIncomeLoss"))
        )

    // OtherComprehensiveIncome
    // 기타포괄손익 재정의
    .withColumn(
        "OtherComprehensiveIncome",
        when(
            col("reprtCode") === "11011",
            col("OtherComprehensiveIncome") -
            lag(col("OtherComprehensiveIncome"), offset=1).over(partition) -
            lag(col("OtherComprehensiveIncome"), offset=2).over(partition) -
            lag(col("OtherComprehensiveIncome"), offset=3).over(partition)
            )
            .otherwise(col("OtherComprehensiveIncome"))
        )

    // ProfitLoss
    // 당기순이익 재정의
    .withColumn(
        "ProfitLoss",
        when(
            col("reprtCode") === "11011",
            col("ProfitLoss") -
            lag(col("ProfitLoss"), offset=1).over(partition) -
            lag(col("ProfitLoss"), offset=2).over(partition) -
            lag(col("ProfitLoss"), offset=3).over(partition)
            )
            .otherwise(col("ProfitLoss"))
        )

    // ProfitLossBeforeTax
    // 법인세비용차감전순이익 재정의
    .withColumn(
        "ProfitLossBeforeTax",
        when(
            col("reprtCode") === "11011",
            col("ProfitLossBeforeTax") -
            lag(col("ProfitLossBeforeTax"), offset=1).over(partition) -
            lag(col("ProfitLossBeforeTax"), offset=2).over(partition) -
            lag(col("ProfitLossBeforeTax"), offset=3).over(partition)
            )
            .otherwise(col("ProfitLossBeforeTax"))
        )

    // Revenue
    // 매출액 재정의
    .withColumn(
        "Revenue",
        when(
            col("reprtCode") === "11011",
            col("Revenue") -
            lag(col("Revenue"), offset=1).over(partition) -
            lag(col("Revenue"), offset=2).over(partition) -
            lag(col("Revenue"), offset=3).over(partition)
            )
            .otherwise(col("Revenue"))
        )

    // AdjustmentsForInterestExpenses
    // 이자비용 재정의
    .withColumn(
        "AdjustmentsForInterestExpenses",
        when(
            col("reprtCode") !== "11013",
            col("AdjustmentsForInterestExpenses") -
            lag(col("AdjustmentsForInterestExpenses"), offset=1).over(partition)
            )
        .otherwise(col("AdjustmentsForInterestExpenses"))
        )
    
    .withColumn(
        "estDt",
        when(
            col("estDt")isNull,
            min(col("updateDate")).over(Window.partitionBy("stockCode"))
            )
        .otherwise(col("estDt"))
        )
    .withColumn("period", datePeriod(col("estDt"), col("updateDate")))
    .drop("estDt")
    )

In [None]:
val fs_features = (
    fs_preprocess
    .withColumn("Assets", last(col("Assets"), true).over(ff))
    .withColumn("CurrentAssets", last(col("CurrentAssets"), true).over(ff))
    .withColumn("ShortTermTradeReceivable", last(col("ShortTermTradeReceivable"), true).over(ff))
    .withColumn("NoncurrentAssets", last(col("NoncurrentAssets"), true).over(ff))
    .withColumn("Revenue", last(col("Revenue"), true).over(ff))
    .withColumn("OperatingIncomeLoss", last(col("OperatingIncomeLoss"), true).over(ff))
    .withColumn("ProfitLossBeforeTax", last(col("ProfitLossBeforeTax"), true).over(ff))
    .withColumn("ProfitLoss", last(col("ProfitLoss"), true).over(ff))
    .withColumn("CashFlowsFromUsedInOperatingActivities", last(col("CashFlowsFromUsedInOperatingActivities"), true).over(ff))
    .withColumn("Liabilities", last(col("Liabilities"), true).over(ff))
    .withColumn("ShortTermTradePayables", last(col("ShortTermTradePayables"), true).over(ff))
    .withColumn("Equity", last(col("Equity"), true).over(ff))
    .withColumn("IssuedCapital", last(col("IssuedCapital"), true).over(ff))
    .withColumn("RetainedEarnings", last(col("RetainedEarnings"), true).over(ff))
    .withColumn("MarketCap", last(col("MarketCap"), true).over(ff))
    .withColumn("AdjustmentsForInterestExpenses", last(col("AdjustmentsForInterestExpenses"), true).over(ff))
    .withColumn("Inventories", last(col("Inventories"), true).over(ff))
    .withColumn("CostOfSales", last(col("CostOfSales"), true).over(ff))
    .withColumn("AccumulatedDepreciation", last(col("AccumulatedDepreciation"), true).over(ff))
    .withColumn("OtherComprehensiveIncome", last(col("OtherComprehensiveIncome"), true).over(ff))
    .withColumn("AllowanceForDoubtfulAcccount", last(col("AllowanceForDoubtfulAcccount"), true).over(ff))
    .withColumn("CurrentLiabilities", last(col("CurrentLiabilities"), true).over(ff))
    .withColumn("NoncontrollingInterests", last(col("NoncontrollingInterests"), true).over(ff))
    .withColumn("InventoriesBefore", lag(col("Inventories"), offset=1).over(partition))
    .withColumn("ShortTermTradeReceivableBefore", lag(col("ShortTermTradeReceivable"), offset=1).over(partition))
    .withColumn("ShortTermTradePayablesBefore", lag(col("ShortTermTradePayables"), offset=1).over(partition))
    .withColumn("InventoriesRateOfIncrease", (col("Inventories") - col("InventoriesBefore")) / col("InventoriesBefore"))
    .withColumn("ShortTermTradeReceivableRateOfIncrease", (col("ShortTermTradeReceivable") - col("ShortTermTradeReceivableBefore")) / col("ShortTermTradeReceivableBefore"))
    .withColumn("ShortTermTradePayablesRateOfIncrease", (col("ShortTermTradePayables") - col("ShortTermTradePayablesBefore")) / col("ShortTermTradePayablesBefore"))
    .withColumn("SLEQ", col("Revenue") / col("Equity"))
    .withColumn("FFOEQ", col("CashFlowsFromUsedInOperatingActivities") / col("Equity"))
    .withColumn("FFOTL", col("CashFlowsFromUsedInOperatingActivities") / col("Liabilities"))
    .withColumn("NISL", col("ProfitLoss") / col("Revenue"))
    .withColumn("RETA", col("RetainedEarnings") / col("Assets"))
    .withColumn("CACL", col("CurrentAssets") / col("CurrentLiabilities"))
    .withColumn("EQTA", col("Equity") / col("Assets"))
    .withColumn("INSL", col("AdjustmentsForInterestExpenses") / col("Revenue"))
    .withColumn("CATA", col("CurrentAssets") / col("Assets"))
    .withColumn("TLTA", col("Liabilities") / col("Assets"))
    .withColumn("INTL", col("AdjustmentsForInterestExpenses") / col("Liabilities"))
    .withColumn("CLTL", col("CurrentLiabilities") / col("Liabilities"))
    .withColumn("TLEQ", col("Liabilities") / col("Equity"))
    .withColumn("LNSL", log(col("Revenue")))
    .withColumn("LNTA", log(col("Assets")))
    .withColumn("MB", col("MarketCap") / col("Equity"))
    .withColumn("SLFA", col("Revenue") / col("NoncurrentAssets"))
    .withColumn("NIGR", col("ProfitLoss") / lag(col("ProfitLoss"), offset=1).over(partition) - 1)
    .withColumn("FAGR", col("NoncurrentAssets") / lag(col("NoncurrentAssets"), offset=1).over(partition) - 1)
    .withColumn("EBTIN", col("ProfitLossBeforeTax") / col("AdjustmentsForInterestExpenses"))
    .withColumn("CLCA", col("CurrentLiabilities") / col("CurrentAssets"))
    .withColumn("NEGBE", when((col("Equity") - col("NoncontrollingInterests")) < col("IssuedCapital") * 0.75, 1).otherwise(lit(0)))
    .withColumn("CLGR", col("CurrentLiabilities") / lag(col("CurrentLiabilities"), offset=1).over(partition) - 1)
    .withColumn("DSR01", col("InventoriesRateOfIncrease") / col("CLGR"))
    .withColumn("DSR02", col("ShortTermTradeReceivableRateOfIncrease") / col("CLGR"))
    .withColumn("DSR03", col("ShortTermTradePayablesRateOfIncrease") / col("CLGR"))
    .withColumn("DSR04", col("Inventories") / col("CostOfSales"))
    .withColumn("DSR05", col("AccumulatedDepreciation") / col("Inventories"))
    .withColumn("DSR06", col("AllowanceForDoubtfulAcccount") / col("ShortTermTradeReceivable"))
    .withColumn("DSR07", col("OtherComprehensiveIncome") * (col("OtherComprehensiveIncome") / col("Assets")))
    .withColumn("Normalize", col("CashFlowsFromUsedInOperatingActivities") - col("CurrentLiabilities") + col("OperatingIncomeLoss"))
    .withColumn("MAX_N", max(col("Normalize")).over(Window.partitionBy("bsnsYear", "quarter")))
    .withColumn("MIN_N", min(col("Normalize")).over(Window.partitionBy("bsnsYear", "quarter")))
    .withColumn("DSR08", (col("Normalize") - col("MIN_N")) / (col("MAX_N")-col("MIN_N")))
    
//     .select("updateDate" ,"rceptNo", "reprtCode", "bsnsYear", "corpCode",
//             "stockName", "stockCode", "event", "corpCls", "period", "quarter",
//             "DSR01", "DSR02", "DSR03", "DSR04", "DSR05", "DSR06", "DSR07", "DSR08", "LNSL",
//             "LNTA", "MB", "SLFA", "NIGR", "FAGR", "EBTIN", "CLCA", "NEGBE", "TLEQ", "CLTL", "INTL", "TLTA",
//             "CATA", "INSL", "EQTA", "CACL", "RETA", "NISL", "FFOTL", "FFOEQ", "SLEQ", "CLGR")
//     .select("updateDate" , "bsnsYear", "stockName", "stockCode",  "Normalize",  "MAX_N",  "MIN_N",  "DSR08_Temp")
    .orderBy("stockCode", "updateDate", "reprtCode")
    )

In [None]:
fs_features.show()

In [None]:
fs_features.repartition(1).write.option("header","true").csv("s3a://etc-bak/fs.csv")

In [None]:
"""
(
    fs_features
    .where(col("bsnsYear") >= "2015")
    .write.format("mongodb")
    .mode("append")
    .option("spark.mongodb.read.connection.uri", mongoUrl)
    .option("spark.mongodb.write.connection.uri", mongoUrl)
    .option("database", "coreEngine")
    .option("collection", "ReportFeatures_temp")
    .save()
    )
    """

In [None]:
spark.stop()