In [0]:
spark.sql("USE CATALOG crowd2fund")
spark.sql("USE SCHEMA default")

In [0]:
%sql
DROP TABLE IF EXISTS add_funds;
DROP TABLE IF EXISTS withdraw_funds;
DROP TABLE IF EXISTS add_withdraw_funds;
DROP TABLE IF EXISTS add_withdraw_funds_with_balance;

In [0]:
add_funds = spark.table("wallet_transactions").filter("From_To = 'Add Funds'").select("Date", "Amount_In")
add_funds = add_funds.withColumnRenamed("Amount_In", "Amount")
display(add_funds)

In [0]:
withdraw_funds = spark.table("wallet_transactions").filter("From_To = 'Withdrawal Request'").select("Date", "Amount_Out")
withdraw_funds = withdraw_funds.withColumnRenamed("Amount_Out", "Amount")
display(withdraw_funds)

In [0]:
from pyspark.sql.functions import col

withdraw_funds = withdraw_funds.withColumn("Amount", -col("Amount"))
display(withdraw_funds)

In [0]:
add_withdraw_funds = add_funds.unionByName(withdraw_funds)
add_withdraw_funds.write.mode("overwrite").saveAsTable("add_withdraw_funds")

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum

add_withdraw_funds = spark.table("add_withdraw_funds")
window_spec = Window.orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
add_withdraw_funds = add_withdraw_funds.withColumn("Balance", sum("Amount").over(window_spec))
add_withdraw_funds.write.mode("overwrite").saveAsTable("add_withdraw_funds_with_balance")

In [0]:
add_withdraw_funds_with_balance = spark.table("add_withdraw_funds_with_balance").orderBy("Date")
display(add_withdraw_funds_with_balance.select("Date", "Amount", "Balance"))

Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import col

exchange_with_transactions = (
    spark.table("wallet_transactions")
    .alias("wt")
    .join(
        spark.table("my_investments").alias("mi"),
        (col("wt.Date") == col("mi.Investment_date")) &
        (col("wt.Amount_Out") == col("mi.Invested_GBP"))
    )
    .filter(
        col("wt.From_To").rlike("^Exchange with") &
        col("wt.Amount_Out").isNotNull() &
        col("mi.Invested_GBP").isNotNull()
    )
    .select(
        col("wt.Date"),
        col("wt.Amount_In"),
        col("wt.Amount_Out"),
        col("mi.Business_name")
    )
)

other_business_transactions = (
    spark.table("wallet_transactions")
    .alias("wt")
    .filter("From_To <> 'Withdrawal Request'")
    .filter("From_To <> 'Add Funds'")    
    .filter(
        ~col("wt.From_To").rlike("^Exchange with")
    )
    .select(
        col("wt.Date"),
        col("wt.Amount_In"),
        col("wt.Amount_Out"),
        col("wt.From_To").alias("Business_name")
    )
)

business_transactions = other_business_transactions.unionByName(exchange_with_transactions)
business_transactions.write.mode("overwrite").saveAsTable("business_transactions")

In [0]:
%sql
SELECT
  `Business_name`,
  SUM(COALESCE(`Amount_In`, 0)) - SUM(COALESCE(`Amount_Out`, 0)) As Profit
FROM
  business_transactions_cleaned
WHERE
GROUP BY
  `Business_name`
ORDER BY Profit


In [0]:
%sql
SELECT
  TRIM(
    REGEXP_REPLACE(
      UPPER(TRIM(`Business_name`)), '\s*(LTD|LIMITED|LTD\.|LIMITED\.|PLC|INC)\.?\s*$', '', 1
    )
  ) AS base_name,
  COLLECT_SET(`Business_name`) AS variants
FROM
  `crowd2fund`.`default`.`business_transactions`
WHERE
  `Business_name` IS NOT NULL
GROUP BY
  base_name
HAVING
  COUNT(DISTINCT `Business_name`) > 1

In [0]:
%sql
SELECT * FROM business_base_names

In [0]:
%sql
CREATE OR REPLACE TABLE business_base_names AS
SELECT
  TRIM(
    REGEXP_REPLACE(
      UPPER(TRIM(`Business_name`)), '\s*(LTD|LIMITED|LTD\.|LIMITED\.|PLC|INC)\.?\s*$', '', 1
    )
  ) AS base_name,
  First(Business_name) AS Business_name
FROM
  business_transactions
GROUP BY base_name
ORDER BY
  base_name, length(Business_name) DESC

In [0]:
%sql
SELECT Business_name, MAX(Date) as Last_Payment
FROM business_transactions_cleaned
GROUP BY Business_name
HAVING MAX(Date) >= '2025-01-01'
ORDER BY Last_Payment DESC
    




In [0]:
%sql
SELECT SUM(Amount_In), SUM(Amount_Out)
FROM business_transactions_cleaned


In [0]:
%sql
SELECT year(Date) as Year, Sum(Amount) Added_Withdrawn
FROM add_withdraw_funds
GROUP BY Year
ORDER BY Year
