# Transformasi Laporan Keuangan IDX Menggunakan PySpark

## Import Library

In [2]:
pip install pyspark

^C
Note: you may need to restart the kernel to use updated packages.


Defaulting to user installation because normal site-packages is not writeable


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import logging
import os

## Koneksi dan Import MongoDB ke PySpark

Memerlukan Hadoop, PySpark, dan MongoDB

In [4]:
os.environ["HADOOP_HOME"] = "C:/hadoop/hadoop-2.7.1"


# Konfigurasi MongoDB
MONGO_URI = "mongodb://localhost:27017"
DB_NAME = "bigdatatugas"
STOCK_COLLECTION = "2024"

# Fungsi untuk inisialisasi Spark Session
def create_spark_session():
    return (SparkSession.builder
            .appName("Read Stock Data from MongoDB")
            .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")
            .config("spark.mongodb.input.uri", f"{MONGO_URI}/{DB_NAME}.{STOCK_COLLECTION}")
            .config("spark.executor.memory", "4g")
            .config("spark.driver.memory", "4g")
            .getOrCreate())


spark = create_spark_session()


df = spark.read.format("mongo").load()

df.show()

+--------------------+--------------------+--------------------+--------------------+------+
|                 _id|               facts|           file_info|      processed_date|ticker|
+--------------------+--------------------+--------------------+--------------------+------+
|{67c2b17b26a23596...|{IncomeTaxesRefun...|{instance.xbrl, T...|2025-03-01 21:04:...|  AALI|
|{67c2b1da26a23596...|{IncomeTaxesRefun...|{instance.xbrl, T...|2025-03-01 21:06:...|  ARNA|
|{67c2b1f026a23596...|{IncomeTaxesRefun...|{instance.xbrl, T...|2025-03-01 21:06:...|  ASGR|
|{67c2b1f226a23596...|{IncomeTaxesRefun...|{instance.xbrl, T...|2025-03-01 21:06:...|  ASII|
|{67c2b20626a23596...|{IncomeTaxesRefun...|{instance.xbrl, T...|2025-03-01 21:06:...|  AUTO|
|{67c2b20726a23596...|{IncomeTaxesRefun...|{instance.xbrl, T...|2025-03-01 21:06:...|  ADMF|
|{67c2b21d26a23596...|{NonPerformingFin...|{instance.xbrl, T...|2025-03-01 21:07:...|  BACA|
|{67c2b21f26a23596...|{NonPerformingFin...|{instance.xbrl, T...|2025-0

## Transformasi

In [5]:
def calculate_sum_if_exists(*columns):
    """Helper function to calculate sum only when at least one value exists."""
    # First check if any column is not null
    condition = F.lit(False)
    for col in columns:
        condition = condition | col.isNotNull()
    
    # Calculate the sum with null-safe addition
    sum_expr = F.lit(0)
    for col in columns:
        sum_expr = sum_expr + F.coalesce(col, F.lit(0))
    
    # Return the sum if any column has data, otherwise null
    return F.when(condition, sum_expr)

### Transformasi Subsektor Perbankan

In [6]:
# =====================================
# 1. BANKS: G1. Banks
# =====================================
banks_df = df.filter(
    F.col("facts.Subsector_CurrentYearInstant.value") == "G1. Banks"
).select(
    F.col("facts.EntityName_CurrentYearInstant.value").alias("entity_name"),
    F.col("ticker").alias("emiten"),
    F.col("facts.CurrentPeriodEndDate_CurrentYearInstant.value").alias("report_date"),
    
    # Revenue components
    calculate_sum_if_exists(
        F.col("facts.InterestIncome_CurrentYearDuration.value"),
        F.col("facts.SubtotalShariaIncome_CurrentYearDuration.value")
    ).alias("revenue"),
    
    F.col("facts.ProfitFromOperation_CurrentYearDuration.value").alias("gross_profit"),
    F.col("facts.ProfitFromOperation_CurrentYearDuration.value").alias("operating_profit"),
    F.col("facts.ProfitLoss_CurrentYearDuration.value").alias("net_profit"),
    F.col("facts.Cash_CurrentYearInstant.value").alias("cash"),
    F.col("facts.Assets_CurrentYearInstant.value").alias("total_assets"),
    
    # Short-term borrowing
    calculate_sum_if_exists(
        F.col("facts.BorrowingsThirdParties_CurrentYearInstant.value"),
        F.col("facts.BorrowingsRelatedParties_CurrentYearInstant.value")
    ).alias("short_term_borrowing"),
    
    # Long-term borrowing
    calculate_sum_if_exists(
        F.col("facts.SubordinatedLoansThirdParties_CurrentYearInstant.value"),
        F.col("facts.SubordinatedLoansRelatedParties_CurrentYearInstant.value")
    ).alias("long_term_borrowing"),
    
    F.col("facts.Equity_CurrentYearInstant.value").alias("total_equity"),
    F.col("facts.Liabilities_CurrentYearInstant.value").alias("liabilities"),
    F.col("facts.NetCashFlowsReceivedFromUsedInOperatingActivities_CurrentYearDuration.value").alias("cash_dari_operasi"),
    F.col("facts.NetCashFlowsReceivedFromUsedInInvestingActivities_CurrentYearDuration.value").alias("cash_dari_investasi"),
    F.col("facts.NetCashFlowsReceivedFromUsedInFinancingActivities_CurrentYearDuration.value").alias("cash_dari_pendanaan")
)

### Transformasi Subsektor Financing

In [7]:
# =====================================
# 2. FINANCING SERVICES: G2. Financing Service
# =====================================
financing_df = df.filter(
    F.col("facts.Subsector_CurrentYearInstant.value") == "G2. Financing Service"
).select(
    F.col("facts.EntityName_CurrentYearInstant.value").alias("entity_name"),
    F.col("ticker").alias("emiten"),
    F.col("facts.CurrentPeriodEndDate_CurrentYearInstant.value").alias("report_date"),
    
    # Revenue - calculate only if at least one component exists
    calculate_sum_if_exists(
        F.col("facts.IncomeFromMurabahahAndIstishna_CurrentYearDuration.value"),
        F.col("facts.IncomeFromConsumerFinancing_CurrentYearDuration.value"),
        F.col("facts.IncomeFromFinanceLease_CurrentYearDuration.value"),
        F.col("facts.AdministrationIncome_CurrentYearDuration.value"),
        F.col("facts.IncomeFromProvisionsAndCommissions_CurrentYearDuration.value")
    ).alias("revenue"),
    
    # Gross profit
    calculate_sum_if_exists(
        F.col("facts.ProfitLossBeforeIncomeTax_CurrentYearDuration.value"),
        F.col("facts.DepreciationOfInvestmentPropertyLeaseAssetsPropertyAndEquipmentForeclosedAssetsAndIjarahAssets_CurrentYearDuration.value")
    ).alias("gross_profit"),
    
    F.col("facts.ProfitLossBeforeIncomeTax_CurrentYearDuration.value").alias("operating_profit"),
    F.col("facts.ProfitLoss_CurrentYearDuration.value").alias("net_profit"),
    F.col("facts.CashAndCashEquivalents_CurrentYearInstant.value").alias("cash"),
    F.col("facts.Assets_CurrentYearInstant.value").alias("total_assets"),
    
    # Short-term borrowing
    calculate_sum_if_exists(
        F.col("facts.BorrowingsThirdParties_CurrentYearInstant.value"),
        F.col("facts.CurrentAccountsWithOtherBanksThirdParties_CurrentYearInstant.value")
    ).alias("short_term_borrowing"),
    
    # Long-term borrowing
    calculate_sum_if_exists(
        F.col("facts.BorrowingsRelatedParties_CurrentYearInstant.value"),
        F.col("facts.BondsPayable_CurrentYearInstant.value"),
        F.col("facts.Sukuk_CurrentYearInstant.value")
    ).alias("long_term_borrowing"),
    
    F.col("facts.Equity_CurrentYearInstant.value").alias("total_equity"),
    F.col("facts.Liabilities_CurrentYearInstant.value").alias("liabilities"),
    F.col("facts.NetCashFlowsReceivedFromUsedInOperatingActivities_CurrentYearDuration.value").alias("cash_dari_operasi"),
    F.col("facts.NetCashFlowsReceivedFromUsedInInvestingActivities_CurrentYearDuration.value").alias("cash_dari_investasi"),
    F.col("facts.NetCashFlowsReceivedFromUsedInFinancingActivities_CurrentYearDuration.value").alias("cash_dari_pendanaan")
)

### Transformasi Subsektor Investment

In [8]:
# =====================================
# 3. INVESTMENT SERVICE: G3. Investment Service
# =====================================
investment_df = df.filter(
    F.col("facts.Subsector_CurrentYearInstant.value") == "G3. Investment Service"
).select(
    F.col("ticker").alias("emiten"),
    F.col("facts.EntityName_CurrentYearInstant.value").alias("entity_name"),
    F.col("facts.CurrentPeriodEndDate_CurrentYearInstant.value").alias("report_date"),

    # Income Statement
    calculate_sum_if_exists(
        F.col("facts.IncomeFromBrokerageActivity_CurrentYearDuration.value"),
        F.col("facts.IncomeFromUnderwritingActivitiesAndSellingFees_CurrentYearDuration.value"),
        F.col("facts.IncomeFromInvestmentManagementServices_CurrentYearDuration.value")
    ).alias("revenue"),

    # Gross profit calculation with conditional check
    F.when(
        (F.col("facts.IncomeFromBrokerageActivity_CurrentYearDuration.value").isNotNull() |
         F.col("facts.IncomeFromUnderwritingActivitiesAndSellingFees_CurrentYearDuration.value").isNotNull() |
         F.col("facts.IncomeFromInvestmentManagementServices_CurrentYearDuration.value").isNotNull()) &
        F.col("facts.GeneralAndAdministrativeExpenses_CurrentYearDuration.value").isNotNull(),
        F.coalesce(F.col("facts.IncomeFromBrokerageActivity_CurrentYearDuration.value"), F.lit(0)) +
        F.coalesce(F.col("facts.IncomeFromUnderwritingActivitiesAndSellingFees_CurrentYearDuration.value"), F.lit(0)) +
        F.coalesce(F.col("facts.IncomeFromInvestmentManagementServices_CurrentYearDuration.value"), F.lit(0)) -
        F.col("facts.GeneralAndAdministrativeExpenses_CurrentYearDuration.value")
    ).alias("gross_profit"),

    F.col("facts.ProfitLossBeforeIncomeTax_CurrentYearDuration.value").alias("operating_profit"),
    F.col("facts.ProfitLoss_CurrentYearDuration.value").alias("net_profit"),

    # Balance Sheet
    F.col("facts.CashAndCashEquivalents_CurrentYearInstant.value").alias("cash"),
    F.col("facts.Assets_CurrentYearInstant.value").alias("total_assets"),
    F.col("facts.BankLoans_CurrentYearInstant.value").alias("short_term_borrowing"),
    
    # Long-term borrowing: only calculate if both components are available
    F.when(
        F.col("facts.BankLoans_PriorEndYearInstant.value").isNotNull() &
        F.col("facts.BankLoans_CurrentYearInstant.value").isNotNull(),
        F.col("facts.BankLoans_PriorEndYearInstant.value") - F.col("facts.BankLoans_CurrentYearInstant.value")
    ).alias("long_term_borrowing"),
    
    F.col("facts.Equity_CurrentYearInstant.value").alias("total_equity"),
    F.col("facts.Liabilities_CurrentYearInstant.value").alias("liabilities"),

    # Cash Flow
    F.col("facts.NetCashFlowsReceivedFromUsedInOperatingActivities_CurrentYearDuration.value").alias("cash_dari_operasi"),
    F.col("facts.NetCashFlowsReceivedFromUsedInInvestingActivities_CurrentYearDuration.value").alias("cash_dari_investasi"),
    F.col("facts.NetCashFlowsReceivedFromUsedInFinancingActivities_CurrentYearDuration.value").alias("cash_dari_pendanaan")
)

### Transformasi Subsektor Insurance

In [9]:
# =====================================
# 4. INSURANCE: G4. Insurance
# =====================================
insurance_df = df.filter(
    F.col("facts.Subsector_CurrentYearInstant.value") == "G4. Insurance"
).select(
    F.col("facts.EntityName_CurrentYearInstant.value").alias("entity_name"),
    F.col("ticker").alias("emiten"),
    F.col("facts.CurrentPeriodEndDate_CurrentYearInstant.value").alias("report_date"),
    F.col("facts.RevenueFromInsurancePremiums_CurrentYearDuration.value").alias("revenue"),
    
    # Gross profit calculation with conditional check
    F.when(
        F.col("facts.RevenueFromInsurancePremiums_CurrentYearDuration.value").isNotNull() &
        (F.col("facts.ClaimExpenses_CurrentYearDuration.value").isNotNull() |
         F.col("facts.ReinsuranceClaims_CurrentYearDuration.value").isNotNull()),
        F.col("facts.RevenueFromInsurancePremiums_CurrentYearDuration.value") -
        F.coalesce(F.col("facts.ClaimExpenses_CurrentYearDuration.value"), F.lit(0)) -
        F.coalesce(F.col("facts.ReinsuranceClaims_CurrentYearDuration.value"), F.lit(0))
    ).alias("gross_profit"),
    
    F.col("facts.ProfitLossBeforeIncomeTax_CurrentYearDuration.value").alias("operating_profit"),
    F.col("facts.ProfitLoss_CurrentYearDuration.value").alias("net_profit"),
    F.col("facts.CashAndCashEquivalents_CurrentYearInstant.value").alias("cash"),
    F.col("facts.Assets_CurrentYearInstant.value").alias("total_assets"),
    
    # Short-term borrowing
    calculate_sum_if_exists(
        F.col("facts.ClaimPayables_CurrentYearInstant.value"),
        F.col("facts.ReinsurancePayables_CurrentYearInstant.value")
    ).alias("short_term_borrowing"),
    
    F.col("facts.InsuranceLiabilitiesForFuturePolicyBenefits_CurrentYearInstant.value").alias("long_term_borrowing"),
    F.col("facts.Equity_CurrentYearInstant.value").alias("total_equity"),
    F.col("facts.Liabilities_CurrentYearInstant.value").alias("liabilities"),
    F.col("facts.NetCashFlowsReceivedFromUsedInOperatingActivities_CurrentYearDuration.value").alias("cash_dari_operasi"),
    F.col("facts.NetCashFlowsReceivedFromUsedInInvestingActivities_CurrentYearDuration.value").alias("cash_dari_investasi"),
    F.col("facts.NetCashFlowsReceivedFromUsedInFinancingActivities_CurrentYearDuration.value").alias("cash_dari_pendanaan")
)

### Transformasi Emiten Umum (Non-bank, non-financing, non-insurance, non-investment)

In [10]:
# =====================================
# 5. OTHER SECTORS (non-bank, non-finance, non-insurance)
# =====================================
non_special_df = df.filter(
    ~F.col("facts.Subsector_CurrentYearInstant.value").isin(["G1. Banks", "G2. Financing Service", "G3. Investment Service", "G4. Insurance"])
).select(
    F.col("facts.EntityName_CurrentYearInstant.value").alias("entity_name"),
    F.col("ticker").alias("emiten"),
    F.col("facts.CurrentPeriodEndDate_CurrentYearInstant.value").alias("report_date"),
    F.col("facts.SalesAndRevenue_CurrentYearDuration.value").alias("revenue"),
    F.col("facts.GrossProfit_CurrentYearDuration.value").alias("gross_profit"),
    F.col("facts.ProfitLossBeforeIncomeTax_CurrentYearDuration.value").alias("operating_profit"),
    F.col("facts.ProfitLoss_CurrentYearDuration.value").alias("net_profit"),
    F.col("facts.CashAndCashEquivalents_CurrentYearInstant.value").alias("cash"),
    F.col("facts.Assets_CurrentYearInstant.value").alias("total_assets"),
    
    # Use coalesce to find first non-null value for short-term borrowing
    F.coalesce(
        F.col("facts.ShortTermBankLoans_CurrentYearInstant.value"),
        F.col("facts.CurrentMaturitiesOfBankLoans_CurrentYearInstant.value"),
        F.col("facts.OtherCurrentFinancialLiabilities_CurrentYearInstant.value"),
        F.col("facts.ShortTermDerivativeFinancialLiabilities_CurrentYearInstant.value"),
        F.col("facts.CurrentAdvancesFromCustomersThirdParties_CurrentYearInstant.value")
    ).alias("short_term_borrowing"),
    
    F.col("facts.LongTermBankLoans_CurrentYearInstant.value").alias("long_term_borrowing"),
    F.col("facts.Equity_CurrentYearInstant.value").alias("total_equity"),
    F.col("facts.Liabilities_CurrentYearInstant.value").alias("liabilities"),
    F.col("facts.NetCashFlowsReceivedFromUsedInOperatingActivities_CurrentYearDuration.value").alias("cash_dari_operasi"),
    F.col("facts.NetCashFlowsReceivedFromUsedInInvestingActivities_CurrentYearDuration.value").alias("cash_dari_investasi"),
    F.col("facts.NetCashFlowsReceivedFromUsedInFinancingActivities_CurrentYearDuration.value").alias("cash_dari_pendanaan")
)

### Menggabungkan seluruh data

In [11]:
# =====================================
# Combine all DataFrames and Sort
# =====================================
final_df = (
    banks_df
    .unionByName(financing_df)
    .unionByName(investment_df) 
    .unionByName(insurance_df)
    .unionByName(non_special_df)
)

# Sort by emiten name descending
final_df.orderBy("emiten", ascending=False).show(truncate=False, n=final_df.count())

+-----------------------------------------------+------+-----------+-----------------+---------------+----------------+--------------+--------------+----------------+--------------------+-------------------+---------------+----------------+-----------------+-------------------+-------------------+
|entity_name                                    |emiten|report_date|revenue          |gross_profit   |operating_profit|net_profit    |cash          |total_assets    |short_term_borrowing|long_term_borrowing|total_equity   |liabilities     |cash_dari_operasi|cash_dari_investasi|cash_dari_pendanaan|
+-----------------------------------------------+------+-----------+-----------------+---------------+----------------+--------------+--------------+----------------+--------------------+-------------------+---------------+----------------+-----------------+-------------------+-------------------+
|PT Wahana Ottomitra Multiartha Tbk             |WOMF  |2024-12-31 |1.856807E12      |4.10065E11     |3

## Write Data ke MongoDB

In [None]:

STOCK_COLLECTION_OUTPUT = "2024_transformed"

# Write final_df to MongoDB
try:
    final_df.write.format("mongo") \
        .option("uri", MONGO_URI) \
        .option("database", DB_NAME) \
        .option("collection", STOCK_COLLECTION_OUTPUT) \
        .mode("overwrite") \
        .save()
    print("Data successfully written to MongoDB.")
except Exception as e:
    print(f"An error occurred while writing to MongoDB: {e}")