In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *

In [None]:
storage_account_name = "social_support_user_details"
container_name = "input"
mount_point = "/mnt/datalake"

In [None]:
raw_postgresql_path = f"{mount_point}/raw/postgresql/users/"
raw_mongodb_path = f"{mount_point}/raw/mongodb/attachments/"
processed_path = f"{mount_point}/processed/enriched_user_data/"
analytics_path = f"{mount_point}/analytics/user_metrics/"

In [None]:
configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": dbutils.secrets.get(scope="your-scope", key="client-id"),
  "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="your-scope", key="client-secret"),
  "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{dbutils.secrets.get(scope='your-scope', key='tenant-id')}/oauth2/token"
}

# Read data from postgresql

In [None]:
df_users = spark.read.parquet(raw_postgresql_path)
print(f"Users count: {df_users.count()}")
df_users.printSchema()
display(df_users.limit(5))

# Read data from mongodb

In [None]:
df_attachments = spark.read.json(raw_mongodb_path)
print(f"Attachments count: {df_attachments.count()}")
df_attachments.printSchema()
display(df_attachments.limit(5))

## clean and transform user data

In [None]:
df_users_clean = df_users \
    .filter(col("user_id").isNotNull()) \
    .withColumn("created_date", to_date(col("created_at"))) \
    .withColumn("full_name", concat_ws(" ", col("first_name"), col("last_name"))) \
    .withColumn("processed_timestamp", current_timestamp())

## Transform attachment data

In [None]:
df_attachments_clean = df_attachments \
    .filter(col("user_id").isNotNull()) \
    .withColumn("attachment_size_mb", col("size") / 1024 / 1024) \
    .withColumn("file_extension", 
                regexp_extract(col("filename"), r'\.([^.]+)$', 1))

## join user and attachment data

In [None]:
df_enriched = df_users_clean.alias("u") \
    .join(
        df_attachments_clean.alias("a"),
        col("u.user_id") == col("a.user_id"),
        "left"
    ).select(
        col("u.*"),
        col("a.attachment_id"),
        col("a.filename"),
        col("a.attachment_size_mb"),
        col("a.file_extension"),
        col("a.upload_date")
    )

## Write enriched data as delta table

In [None]:
df_enriched.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .partitionBy("created_date") \
    .save(processed_path)

## Create Spark SQL table

In [None]:
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS enriched_user_data
    USING DELTA
    LOCATION '{processed_path}'
""")

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

In [None]:
pandas_df = df_processed.select(
    "monthly_income", "years_of_experience", "family_size",
    "total_assets", "total_liabilities", "age", "eligibility_label"
).toPandas()

In [None]:
pandas_df["wealth_index"] = pandas_df["total_assets"] - pandas_df["total_liabilities"]

In [None]:
X = pandas_df[["monthly_income", "years_of_experience", "family_size", "wealth_index", "age"]]
y = pandas_df["eligibility_label"]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

In [None]:
y_pred = model.predict(X_test)
print("Accuracy:", accuracy_score(y_test, y_pred))

In [None]:
predictions = pd.DataFrame({
    "monthly_income": X_test["monthly_income"],
    "prediction": y_pred
})

In [None]:
predictions_spark = spark.createDataFrame(predictions)

In [None]:
predictions_spark.write.format("delta").mode("overwrite").save(analytics_path)