In [0]:
import random
from pyspark.sql import SparkSession
import random
import pandas as pd

spark = SparkSession.getActiveSession()

# Read user_ids from dim_user
user_df = spark.table('workspace.csm_project.dim_user').select('user_id').dropna()
user_ids = [row['user_id'] for row in user_df.collect()]
random.shuffle(user_ids)
user_ids = user_ids[:10000]

# Read contracts from dim_contract
contract_df = spark.table('workspace.csm_project.dim_contract').select('contract').dropna()
contracts = [row['contract'] for row in contract_df.collect()]
random.shuffle(contracts)
contracts = contracts[:2000]


In [0]:
import random
import pandas as pd

# Initialize user-contract assignments
data = []
user_contracts = {uid: set() for uid in user_ids}
contract_users = {cid: set() for cid in contracts}

# Assign users to contracts with randomized 1-5 users per contract
for contract in contracts:
    max_users = random.randint(1, 5)
    possible_users = [uid for uid in user_ids if len(user_contracts[uid]) < 5]
    assigned_users = random.sample(possible_users, min(max_users, len(possible_users)))
    for uid in assigned_users:
        user_contracts[uid].add(contract)
        contract_users[contract].add(uid)
        data.append({"user_id": uid, "contract_id": contract})

bridge_df = pd.DataFrame(data)
bridge_df = bridge_df.drop_duplicates()

# Add start_date and end_date columns
bridge_df['start_date'] = pd.to_datetime('2022-04-01')
bridge_df['end_date'] = pd.to_datetime('2026-12-12')

# Randomize is_primary: only one user per contract is primary
bridge_df['is_primary'] = False
for contract in bridge_df['contract_id'].unique():
    contract_users = bridge_df[bridge_df['contract_id'] == contract].index.tolist()
    if contract_users:
        primary_idx = random.choice(contract_users)
        bridge_df.at[primary_idx, 'is_primary'] = True

bridge_df.sort_values(by = 'contract_id').head(10)
bridge_df.info()

In [0]:
from pyspark.sql.functions import col, to_date

# Convert pandas datetime columns to string in 'YYYY-MM-DD' format
bridge_df['start_date'] = bridge_df['start_date'].dt.strftime('%Y-%m-%d')
bridge_df['end_date'] = bridge_df['end_date'].dt.strftime('%Y-%m-%d')

# Convert pandas DataFrame to Spark DataFrame
spark_bridge_df = spark.createDataFrame(bridge_df)

# Cast start_date and end_date to Spark DATE type
spark_bridge_df = spark_bridge_df.withColumn('start_date', to_date(col('start_date'), 'yyyy-MM-dd'))
spark_bridge_df = spark_bridge_df.withColumn('end_date', to_date(col('end_date'), 'yyyy-MM-dd'))

# Write to Delta table
spark_bridge_df.write.mode('overwrite').saveAsTable('workspace.csm_project.user_contract_details')