In [1]:
#172.18.0.4"
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
import os

In [2]:
CATALOG_URI = "http://nessie:19120/api/v1"  # Nessie Server URI
WAREHOUSE = "s3://warehouse/"               # Minio Address to Write to
STORAGE_URI = "http://172.18.0.3:9000"     # Minio IP address from docker inspect


In [3]:
conf = (
    pyspark.SparkConf()
        .setAppName('billing')
        # Include necessary packages
        .set("spark.sql.debug.maxToStringFields", "100000")
        .set('spark.jars', '''/opt/spark/workjars/iceberg-spark-runtime-3.5_2.12-1.5.0.jar,/opt/spark/workjars/nessie-spark-extensions-3.5_2.12-0.77.1.jar,
        /opt/spark/workjars/bundle-2.24.8.jar,/opt/spark/workjars/url-connection-client-2.24.8.jar''')
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', CATALOG_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.nessie.s3.endpoint', STORAGE_URI)
        .set('spark.sql.catalog.nessie.warehouse', WAREHOUSE)
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        .set("spark.executor.memory", "2g")
        .set("spark.driver.memory", "2g")
        .set("spark.executor.memoryOverhead", "512m")
        .set("spark.sql.shuffle.partitions", "64")
        .set("spark.shuffle.spill", "true")
        .set("spark.shuffle.memoryFraction", "0.4")
)

In [4]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# print("Spark Session Started")
# spark = SparkSession.builder.master("local[*]").config(conf=conf).getOrCreate()
print("Spark Session Started")

25/05/08 04:26:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Spark Session Started


In [None]:
spark.sql("DELETE FROM nessie.starschema.ageing_fact WHERE YEAR(BillingMonth) = 2024")

### Adding predictions

In [18]:
import re
import time
from pyspark.sql.functions import lit

csv = "predictions/fault_predictions_finale.csv"
df = spark.read \
    .option("delimiter",",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(csv)

df.head()

Row(fdr_Id=99, Prediction Probability=0.8832055926322937, Predicted Fault=1, Actual Fault=0)

In [19]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.predictions")
df.writeTo("nessie.predictions.fault_predictions_finale").createOrReplace()

In [6]:
spark.stop()

In [102]:
spark.sql("""
    ALTER TABLE nessie.starschema.ageing_fact
    DROP COLUMNS (
`PSC Sub-Department`
    )
""").show()


++
||
++
++



# Testing Cleaning

In [79]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
import re
from pyspark.sql.functions import lit
from pyspark.sql.functions import to_timestamp, regexp_replace
from pyspark.sql import functions as F

## Ageing

Testing Time

In [107]:
import re
import time
from pyspark.sql.functions import lit

csv = "/mnt/HabibData/Ageing_202305.csv"
starttime = time.time()
df = spark.read \
    .option("delimiter", "|") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(csv)
endtime = time.time()
sparktime = endtime-starttime

print("Time taken to read through spark:",sparktime)



Time taken to read through spark: 29.43564248085022


                                                                                

In [108]:
match = re.search(r'_(\d{6})', csv)# Extract month and year from the filename
billing_month = match.group(1) if match else "Unknown"


In [109]:
df = df.withColumn("BillingMonth", lit(billing_month))# Add the BillingMonth column

# Repartition the DataFrame
# df_updated = df_updated.repartition(200)


In [20]:
df.limit(1).show()

+---------------+--------+----------------+---------------------+-------------------+---------------+----------------+--------+------------+-------------+-------------+-------------+------+---+---------+---------+-----+------+----------------+----------------+----------------------+---------------+-----------+----+-----+---------+----+---------------+--------------+------+------------+--------------+------------+-------------+----+----------------+--------------+-------------------+-------------------+------------+------------------+------------+--------------+------------------+---------------+-------------------+-----------------------+-------------+----------------+-----------------------+------+-----------------+-------------------+------------+----------+---------------+---------------+------------------+------------+--------+----+-----------+------------+------------+-------------+-------------+--------------+-------------+--------------+------------+-------------+---------------

In [110]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
import re
from pyspark.sql.functions import lit
from pyspark.sql.functions import to_timestamp, regexp_replace
from pyspark.sql import functions as F
from pyspark.sql.functions import col, to_date, year, month, sum as _sum

In [111]:
def replace_invalid_strings_with_null(df):
    # Iterate over all columns in the DataFrame
    for col_name, dtype in df.dtypes:
        # If the column is of string type, replace invalid values with null
        if dtype == 'string':
            df = df.withColumn(
                col_name,
                F.when(
                    (F.col(col_name) == '.') | 
                    (F.col(col_name) == '-') | 
                    (F.col(col_name) == 'Undefined') | 
                    (F.col(col_name) == 'Not found'),
                    F.lit(None)  # Replace with null
                ).otherwise(F.col(col_name))  # Keep the original value if valid
            )    
    return df

In [112]:
    columns_to_remove = [
        'A <=(366)', 'B (365)-(1)', 'C 0-30', 'D 31-60', 'E 61-90', 'F 91-120', 'G 121-150',
        'H 151-180', 'I 181-210', 'J 211-240', 'K 241-270', 'L 271-300', 'M 301-330', 'N 331-360',
        'O 361-390', 'P 391-420', 'Q 421-450', 'R 451-480', 'S 481-510', 'T 511-540', 'U 541-570',
        'V 571-600', 'W 601-630', 'X 631-660', 'Y 1021-1050', 'Y 1051-1080', 'Y 1081-1110', 'Y 1111-1140',
        'Y 1141-1170', 'Y 1171-1200', 'Y 1201-1230', 'Y 1231-1260', 'Y 1261-1460', 'Y 1461-1825',
        'Y 661-690', 'Y 691-720', 'Y 721-750', 'Y 751-780', 'Y 781-810', 'Y 811-840', 'Y 841-870',
        'Y 871-900', 'Y 901-930', 'Y 931-960', 'Y 961-990', 'Y 991-1020', 'Z >=1826', 'Key Date',
        'Legacy Account Number', 'Legacy Move In Date', 'Contract', 'Move-In Date', 'Move-Out Date',
        'PIBC', 'DCIBC', 'DC OIP', 'DC Rate Category', 'CA Creation Date', 'Contract Creation Date',
        'ConsumerCounter', 'Last SIR Number', 'Last SIR CreatedOn', 'AverageUnits',
        'AverageAmount', 'AdjustedUnits', 'AdjustedAmount', 'AssessedUnits', 'AssessedAmount', 'FICAMonth',
        'OpeningBalance', 'MigrationBalance', 'BankCharges', 'WriteOff', 'PreviousYearAllowance',
        'DownPaymentRequest', 'DownPayment', 'OutstandingDownPayment', 'MNCVAdjustment', 'MNCVPayment',
        'MNCVClearingAmount', 'Set Aside Amount', 'Set Aside Code', 'Installement Number', 'Installement Amount',
        'Agency', 'Schedule Number', 'PSC Consumer IBC', 'Adjustment', 'IBCTransfer', 'ClearingAmount', ''
      'PSC Location','PSC Department','PSC Classification','PSC Ministry','PSC Sub-Department'
        ]
    df_reduced = df.drop(*columns_to_remove)    # Drop the columns from the DataFrame
    

In [113]:
num_columns = len(df_reduced.columns)
print(f"Number of columns: {num_columns}")

Number of columns: 61


In [25]:
df_reduced.count()

                                                                                

4345786

In [114]:
df_filtered = df_reduced.filter(df_reduced.Status == 'ACT')

In [27]:
df_filtered.count()

                                                                                

3542316

In [115]:
df_filtered = df_filtered.drop('Status')

In [29]:
num_columns = len(df_filtered.columns)
print(f"Number of columns: {num_columns}")

Number of columns: 65


In [73]:
df_filtered.dtypes

[('AccountContract', 'string'),
 ('Business Partner', 'string'),
 ('Consumer Number', 'string'),
 ('Contract Account', 'bigint'),
 ('Billing Class', 'string'),
 ('Rate Category', 'string'),
 ('Region', 'string'),
 ('IBC', 'string'),
 ('IBCName', 'string'),
 ('Postal Code', 'string'),
 ('OIP', 'string'),
 ('Phase', 'string'),
 ('Cycle Day', 'string'),
 ('MRU', 'string'),
 ('Sanctioned Load', 'double'),
 ('Connected Load', 'double'),
 ('Last DC Date', 'string'),
 ('Last DC Reason', 'int'),
 ('Premise Type', 'string'),
 ('Customer Name', 'string'),
 ('PMT', 'int'),
 ('PSC Location', 'string'),
 ('PSC Classification', 'string'),
 ('PSC Ministry', 'string'),
 ('PSC Department', 'string'),
 ('PSC Sub-Department', 'string'),
 ('PSC Consumer Region', 'string'),
 ('Strategic/Non-Strategic', 'string'),
 ('Consumer Type', 'string'),
 ('Industry Classification', 'string'),
 ('Last Payment Date', 'string'),
 ('Last Payment Amount', 'double'),
 ('Meter Number', 'string'),
 ('Meter Make', 'string'),


In [116]:
df_updated = df_filtered.withColumn('Last Payment Date', to_timestamp(col('Last Payment Date'), 'dd-MMM-yy'))

In [32]:
df_updated.schema["Last Payment Date"].dataType

TimestampType()

In [117]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY") #important warna error
#CA Creation Date (MM/DD/YYYY), Contract Creation Date(DD/MM/YYYY), Last DC Date (DD-mon-YYYY) (same as last payment)
df_updated.filter(df_updated["Last Payment Date"].isNotNull()).select("Last Payment Date").limit(10).show()

+-------------------+
|  Last Payment Date|
+-------------------+
|2023-01-04 00:00:00|
|2023-05-08 00:00:00|
|2023-05-08 00:00:00|
|2023-02-06 00:00:00|
|2022-05-06 00:00:00|
|2023-05-08 00:00:00|
|2023-05-30 00:00:00|
|2023-05-05 00:00:00|
|2023-05-30 00:00:00|
|2023-05-08 00:00:00|
+-------------------+



In [118]:
# NULL values
df_updated = replace_invalid_strings_with_null(df_updated)

In [119]:
df_updated.filter(df_updated["Region"] == "Undefined").limit(1).show()



+---------------+----------------+---------------+----------------+-------------+-------------+------+---+-------+-----------+---+-----+---------+---+---------------+--------------+------------+--------------+------------+-------------+---+-------------------+-----------------------+-------------+-----------------------+-----------------+-------------------+------------+----------+---------------+------------+--------+---+-----------+------------+------------+-------------+-----------------+------------------+---------------+----------------+------------+-------------+----------------+-----------------+-------------+-------------+--------------+------------+---------+---------+-------+--------------+---------------+--------------+------------+-------------------+-----------------+----------------------+--------------------+
|AccountContract|Business Partner|Consumer Number|Contract Account|Billing Class|Rate Category|Region|IBC|IBCName|Postal Code|OIP|Phase|Cycle Day|MRU|Sanctioned Loa

                                                                                

#### IRB Amount = IRBRevisedAmount+IRBDetectionAmount
#### IRB Units = IRBRevisedUnits+IRBDetectionUnits

In [120]:
from pyspark.sql import functions as F

# Update df_updated by adding two new columns IRBAmount and IRBUnits
df_updated = df_updated.withColumn(
    "IRBAmount", F.col("IRBRevisedAmount") + F.col("IRBDetectionAmount")
).withColumn(
    "IRBUnits", F.col("IRBRevisedUnits") + F.col("IRBDetectionUnits")
)

In [121]:
# Display the first 5 rows where the values are not null and not zero for the specified columns
df_updated.select("CurrentAmount","RegularAmount","IRBAmount", "IRBDetectionAmount", "IRBRevisedAmount") \
    .filter(
        (F.col("IRBAmount").isNotNull()) & 
        (F.col("IRBAmount") != 0) & 
        (F.col("IRBDetectionAmount").isNotNull()) & 
        (F.col("IRBDetectionAmount") != 0) & 
        (F.col("IRBRevisedAmount").isNotNull()) & 
        (F.col("IRBRevisedAmount") != 0)
    ) \
    .limit(5) \
    .show()


[Stage 6:>                                                          (0 + 1) / 1]

+-------------+-------------+------------------+------------------+----------------+
|CurrentAmount|RegularAmount|         IRBAmount|IRBDetectionAmount|IRBRevisedAmount|
+-------------+-------------+------------------+------------------+----------------+
|     14643.39|    113594.53|         -98951.14|              4.61|       -98955.75|
|      2528.52|      3716.89|          -1188.37|           5115.37|        -6303.74|
|    -20001.69|       1899.4|         -21901.09|           1367.44|       -23268.53|
|      45330.7|     12995.33|32335.370000000003|           42764.0|       -10428.63|
|     15137.15|      1757.29|          13379.86|           6214.08|         7165.78|
+-------------+-------------+------------------+------------------+----------------+



                                                                                

In [35]:
# negative_IRBRevisedAmount = df_updated.filter(F.col("IRBRevisedAmount") < 0)

# # Show the rows with negative IRBRevisedAmount
# negative_IRBRevisedAmount.show()

In [36]:
# table = 'ageing_cleaned' 
# spark.sql(f"CREATE NAMESPACE nessie.{table}").show()
# df_updated.writeTo(f"nessie.{table}.{table}_data_raw").createOrReplace()        

In [37]:
# spark.sql("DROP table nessie.ageing_cleaned").show()

In [38]:

   # # df_cleaned.printSchema()
   #  # df_cleaned.show()
   #  table = table+'_cleaned' 
   #  df = df.repartition(200)    # Repartition the DataFrame
   #  try:
   #      if table in [r.namespace for r  in result]:
   #          logging.info(f"Table {table} exists, proceeding with appending data.")
   #          df.writeTo(f"nessie.{table}.{table}_data_raw").append()
   #      else:
   #          logging.info(f"Table {table} does not exist, creating one")
   #          spark.sql(f"CREATE NAMESPACE nessie.{table}").show()
   #          df.writeTo(f"nessie.{table}.{table}_data_raw").createOrReplace()        
   #  except Exception as e:
   #      logging.info(f"Data could not be added: {str(e)}")



## Transformation

In [122]:
from pyspark.sql.functions import col, to_date, year, month, sum as _sum

# Assuming `df` is your original flat dataframe
    # billing_month is between April and September

# Parse Last Payment Date to extract month and year for grouping
df_updated = df_updated.withColumn("LastPaymentDate", col("Last Payment Date").cast("date"))
df_updated = df_updated.withColumn("PaymentYear", year(col("LastPaymentDate")))
df_updated = df_updated.withColumn("PaymentMonth", month(col("LastPaymentDate")))


In [123]:
#add billing year and billing month 
from pyspark.sql import functions as F

# Assuming the 'billing_month' column is in the form YYYYMM as an integer
df_updated = df_updated.withColumn(
    "Year", F.substring(F.col("billingmonth").cast("string"), 1, 4).cast("int")
).withColumn(
    "Month", F.substring(F.col("billingmonth").cast("string"), 5, 2).cast("int")
)

# Show the updated DataFrame
df_updated.limit(1).show()


+--------------------+----------------+---------------+----------------+-------------+-------------+------+---+---------+-----------+----+-----+---------+----+---------------+--------------+------------+--------------+------------+--------------+----+-------------------+-----------------------+-------------+-----------------------+-----------------+-------------------+------------+----------+---------------+------------+--------+----+-----------+------------+------------+-------------+-----------------+------------------+---------------+----------------+------------+-------------+----------------+-----------------+-------------+-------------+--------------+------------+---------+---------+-------+--------------+---------------+--------------+------------+-------------------+-----------------+----------------------+--------------------+---------+--------+---------------+-----------+------------+----+-----+
|     AccountContract|Business Partner|Consumer Number|Contract Account|Billing C

In [42]:
df_updated.select("billingmonth","Year","Month").limit(1).show()

+------------+----+-----+
|billingmonth|Year|Month|
+------------+----+-----+
|  2023-04-01|2023|    4|
+------------+----+-----+



In [160]:
len(df_updated.columns)

72

In [124]:
#keepbillingmonth in date format for laying out charts
from pyspark.sql.functions import to_date, col, lpad, lit, concat

# Overwrite BillingMonth with a proper date (e.g., 2023-04-01)
df_updated = df_updated.withColumn(
    "BillingMonth",
    to_date(concat(lpad(col("BillingMonth").cast("string"), 6, "0"), lit("01")), "yyyyMMdd")
)


In [43]:
spark.sql("DROP TABLE nessie.starschema.ageing_fact")

DataFrame[]

In [44]:
# 1. Ageing_Fact
# save all 65 columns
# spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.starschema")
start = time.time()
df_updated.writeTo("nessie.starschema.ageing_fact").createOrReplace()
print("Time taken to append first batch: ",time.time()-start)

                                                                                

Time taken to append first batch:  46.322553396224976


#### Merge with crm's consumer dim

#### if ageingdata.month>3 and ageingdata.month<10 only valid for months of apr-sep

In [167]:
if 4 <= int(billing_month[4:6]) <= 9:
    print("yes")


yes


##### Consumer Handling

In [125]:
# crm
# Total complaints
# Most Occured Subject 

# Average Complaint Resolution Time (hrs)
# oms
# Average Fault Duration Time (hrs)


from pyspark.sql import functions as F
from pyspark.sql.functions import col
#for actual use it will be the data that we just read and cleaned
ageing_new_data = df_updated
#for testing read from nessie
# ageing_new_data = spark.read.table("nessie.starschema.ageing_fact")
# TEMP
crm_consumer_dim_ = spark.read.table("nessie.starschema.crm_consumer_dim")


In [126]:
# 2. Consumer_Dim
# 2. IBC_Dim: Group by IBCCode and other IBC-related fields
# ageing_new_data = df_updated
# Perform the inner join between ageing_new_data and crm_consumer_dim_ on BillingMonth, Year, and AccountContract
consumer_ageing_crm_dim = ageing_new_data.alias("ageing").join(
    crm_consumer_dim_.alias("crm"), 
    (F.col("ageing.AccountContract") == F.col("crm.AccountContract")) &
    (F.col("ageing.Month") == F.col("crm.Month")) &
    (F.col("ageing.Year") == F.col("crm.Year")),
    how='inner'
).groupBy("ageing.AccountContract","ageing.Month","ageing.Year").agg(
    # First value of relevant Ageing columns (from ageing_new_data)
    F.first("ageing.BillingMonth").alias("Billing Month"),
    F.first("ageing.Business Partner").alias("Business_Partner"),
    F.first("ageing.Consumer Type").alias("ConsumerType"),
    F.first("ageing.Customer Name").alias("ConsumerName"),
    F.first("ageing.Postal Code").alias("PostalCode"),
    F.first("ageing.Billing Class").alias("BillingClass"),
    F.first("ageing.Phase").alias("Phase"),
    F.first("ageing.OIP").alias("OIP"),
    F.first("ageing.Premise Type").alias("Premise_Type"),
    F.first("ageing.IBCName").alias("IBCName"),
    F.first("ageing.IBC").alias("IBC_Code"),
    F.first("ageing.BCM").alias("Billing Charge Mode (BCM)"),
    # First value of relevant crm columns (from crm_consumer_dim)
    # F.first("crm.Month").alias("Month"), not needed cuz already contained in group by
    # F.first("crm.Year").alias("Year"),
    F.first("crm.StarCustomer").alias("StarCustomer"),

    # Not taking aggregates from ageing because each account contract is unique in a month
    F.first("ageing.IRBAmount").alias("IRB Amount"),
    F.first("ageing.IRBUnits").alias("IRB Units"),
    F.first("ageing.Sanctioned Load").alias("Sanctioned Load"),
    F.first("ageing.Connected Load").alias("Connected Load"),
    F.first("ageing.CurrentAmount").alias("Current Amount"),
    F.first("ageing.CurrentUnits").alias("Current Units"),
    
    # Aggregated values from CRM
    F.first("crm.Complaint Counts (Apr-Sep_23-24)").alias("Complaint Counts"),
    F.first("crm.Average Complaint Resolution Time (mins)").alias("Average Complaint Resolution Time (mins)")
)

# Show the result

In [127]:
consumer_ageing_crm_dim.limit(15).show()

[Stage 13:>                                                       (0 + 12) / 13]

+--------------------+-----+----+-------------+----------------+--------------------+--------------------+----------+------------+----------+---+--------------------+-----------+--------+-------------------------+-------------+----------+---------+---------------+--------------+--------------+-------------+----------------+----------------------------------------+
|     AccountContract|Month|Year|Billing Month|Business_Partner|        ConsumerType|        ConsumerName|PostalCode|BillingClass|     Phase|OIP|        Premise_Type|    IBCName|IBC_Code|Billing Charge Mode (BCM)| StarCustomer|IRB Amount|IRB Units|Sanctioned Load|Connected Load|Current Amount|Current Units|Complaint Counts|Average Complaint Resolution Time (mins)|
+--------------------+-----+----+-------------+----------------+--------------------+--------------------+----------+------------+----------+---+--------------------+-----------+--------+-------------------------+-------------+----------+---------+---------------+--

                                                                                

In [70]:
spark.sql("DROP TABLE nessie.starschema.consumer_ageing_crm_dim")

DataFrame[]

In [128]:
consumer_ageing_crm_dim.writeTo("nessie.starschema.consumer_ageing_crm_dim").append()

                                                                                

In [133]:
spark.stop()

In [None]:
# ###### For other aggregates:
# # 2. Consumer_Dim
# # 2. IBC_Dim: Group by IBCCode and other IBC-related fields
# # ageing_new_data = df_updated
# # Perform the inner join between ageing_new_data and crm_consumer_dim_ on BillingMonth, Year, and AccountContract
# consumer_ageing_crm_dim = ageing_new_data.alias("ageing").join(
#     crm_consumer_dim_.alias("crm"), 
#     (F.col("ageing.AccountContract") == F.col("crm.AccountContract")) &
#     (F.col("ageing.Month") == F.col("crm.Month")) &
#     (F.col("ageing.Year") == F.col("crm.Year")),
#     how='inner'
# ).groupBy("ageing.AccountContract","ageing.Month","ageing.Year").agg(
#     # First value of relevant Ageing columns (from ageing_new_data)
#     F.first("ageing.BillingMonth").alias("Billing Month"),
#     F.first("ageing.Business Partner").alias("Business_Partner"),
#     F.first("ageing.Consumer Type").alias("ConsumerType"),
#     F.first("ageing.Customer Name").alias("ConsumerName"),
#     F.first("ageing.Postal Code").alias("PostalCode"),
#     F.first("ageing.Billing Class").alias("BillingClass"),
#     F.first("ageing.Phase").alias("Phase"),
#     F.first("ageing.OIP").alias("OIP"),
#     F.first("ageing.Premise Type").alias("Premise_Type"),
#     F.first("ageing.IBCName").alias("IBCName"),
#     F.first("ageing.IBC").alias("IBC_Code"),
#     # First value of relevant Ageing columns (from crm_consumer_dim)
#     # F.first("crm.Month").alias("Month"), not needed cuz already contained in group by
#     # F.first("crm.Year").alias("Year"),
#     F.first("crm.StarCustomer").alias("StarCustomer"),

#     # Aggregations from Ageing Data (IRB related columns)
#     F.sum("ageing.IRBAmount").alias("Total IRB Amount"),
#     F.sum("ageing.IRBUnits").alias("Total IRB Units"),
#     F.avg("ageing.IRBAmount").alias("Average IRB Amount"),
#     F.avg("ageing.IRBUnits").alias("Average IRB Units"),
#     # Not taking aggregates from ageing because each account contract is unique in a month
#     F.first("ageing.Sanctioned Load").alias("Total Sanctioned Load"),
#     F.sum("ageing.Connected Load").alias("Total Connected Load"),
#     F.avg("ageing.Sanctioned Load").alias("Average Sanctioned Load"),
#     F.avg("ageing.Connected Load").alias("Average Connected Load"),
    
#     F.sum("ageing.CurrentAmount").alias("Total Current Amount"),
#     F.avg("ageing.CurrentAmount").alias("Average Current Amount"),
#     F.sum("ageing.CurrentUnits").alias("Total Current Units"),
#     F.avg("ageing.CurrentUnits").alias("Average Current Units"),
    
#     # Aggregated values from CRM
#     F.first("crm.Complaint Counts (Apr-Sep_23-24)").alias("Complaint Counts"),
#     F.first("crm.Average Complaint Resolution Time (mins)").alias("Average Complaint Resolution Time (mins)")
# )

# # Show the result

#### IBC Handling

In [49]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
#for actual use it will be the data that we just read and cleaned
ageing_new_data = df_updated
#TEST ONLY 
# ageing_new_data = spark.read.table("nessie.starschema.ageing_fact")
ibc_crm_dim_ = spark.read.table("nessie.starschema.ibc_crm_dim")


##### With just crm, 33 ibcs

In [50]:
# 2. Consumer_Dim
# 2. IBC_Dim: Group by IBCCode and other IBC-related fields
from pyspark.sql import functions as F
from pyspark.sql.functions import col

# Perform the inner join between ageing_new_data and crm_consumer_dim_ on BillingMonth, Year, and AccountContract
ibc_crm_ageing_dim_33 = ageing_new_data.alias("ageing").join(
    ibc_crm_dim_.alias("ibc_crm"), 
    (F.col("ageing.IBC") == F.col("ibc_crm.IBCCode")) &
    (F.col("ageing.Month") == F.col("ibc_crm.Month")) &
    (F.col("ageing.Year") == F.col("ibc_crm.Year")),
    how='inner'
).groupBy("ageing.IBC", "ageing.Month", "ageing.Year").agg(
    # First value of relevant Ageing columns (from ageing_new_data)
    F.first("ageing.IBCName").alias("IBC Name"),
    F.first("ageing.Region").alias("Region"),
    F.first("ageing.BillingMonth").alias("Billing Month"),
    
    # Aggregations from Ageing Data (IRB related columns)
    F.first("ibc_crm.Complaint Counts").alias("Complaint Counts"),
    F.first("ibc_crm.Average Complaint Resolution Time (mins)").alias("Average Complaint Resolution Time (mins)"),
    
    F.sum("ageing.CurrentAmount").alias("Total Current Amount"),
    F.avg("ageing.CurrentAmount").alias("Average Current Amount"),
    F.sum("ageing.CurrentUnits").alias("Total Current Units"),
    F.avg("ageing.CurrentUnits").alias("Average Current Units"),

    F.sum("ageing.Sanctioned Load").alias("Total Sanctioned Load"),
    F.avg("ageing.Sanctioned Load").alias("Average Sanctioned Load"),
    F.sum("ageing.Connected Load").alias("Total Connected Load"),
    F.avg("ageing.Connected Load").alias("Average Connected Load"),
    
    F.sum("ageing.IRBAmount").alias("Total IRB Amount"),
    F.avg("ageing.IRBAmount").alias("Average IRB Amount"),
    F.sum("ageing.IRBUnits").alias("Total IRB_Units"),
    F.avg("ageing.IRBUnits").alias("Average IRB Units"),
    # Aggregated values from CRM
    # F.first("crm.Month").alias("Month"),
    # F.first("crm.Year").alias("Year"),
)

# Show the result
# ibc_crm_ageing_dim_33.limit(1).show()

In [52]:
# spark.sql("DROP TABLE nessie.starschema.ibc_crm_ageing_dim_33")

In [53]:
ibc_crm_ageing_dim_33.writeTo("nessie.starschema.ibc_crm_ageing_dim_33").createOrReplace()
# ibc_crm_ageing_dim_33.writeTo("nessie.starschema.ibc_crm_ageing_dim_33").append()

                                                                                

##### Now all three (ageing crm oms) joined 4 ibcs

In [54]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
#for actual use it will be the data that we just read and cleaned
ageing_new_data = df_updated
#TEST ONLY 
# ageing_new_data = spark.read.table("nessie.starschema.ageing_fact")
ibc_oms_crm_dim = spark.read.table("nessie.starschema.ibc_oms_crm_dim")

In [55]:
# 2. Consumer_Dim
# 2. IBC_Dim: Group by IBCCode and other IBC-related fields
from pyspark.sql import functions as F
from pyspark.sql.functions import col

# Perform the inner join between ageing_new_data and crm_consumer_dim_ on BillingMonth, Year, and AccountContract
ibc_all_dim_4 = ageing_new_data.alias("ageing").join(
    ibc_oms_crm_dim.alias("ibc_crm_oms"), 
    (F.col("ageing.IBC") == F.col("ibc_crm_oms.IBCCode")) &
    (F.col("ageing.Month") == F.col("ibc_crm_oms.Month")) &
    (F.col("ageing.Year") == F.col("ibc_crm_oms.Year")),
    how='inner'
).groupBy("ageing.IBC", "ageing.Month", "ageing.Year").agg(
    # First value of relevant Ageing columns (from ageing_new_data)
    F.first("ageing.IBCName").alias("IBC Name"),
    F.first("ageing.Region").alias("Region"),
    F.first("ageing.BillingMonth").alias("Billing Month"),
    # Aggregations are already calculated in oms crm dim
    F.first("ibc_crm_oms.Complaint Counts").alias("Complaint Counts"),
    F.first("ibc_crm_oms.Average Complaint Resolution Time (mins)").alias("Average Complaint Resolution Time (mins)"),
    F.first("ibc_crm_oms.Total Outages/Faults").alias("Total Outages/Faults"),
    F.first("ibc_crm_oms.Most_Occurred_Fault").alias("Most Occurred Fault"),
    F.first("ibc_crm_oms.Most_Occurred_Fault_Frequency").alias("Most Occurred Fault Frequency"),
    F.first("ibc_crm_oms.Average Fault Turn-Around Time (mins)").alias("Average Fault Turn-Around Time (mins)"),
    F.first("ibc_crm_oms.Average Fault Duration (mins)").alias("Average Fault Duration (mins)"),
    F.first("ibc_crm_oms.Rain Frequency").alias("Rain Frequency"),

    # Aggregations from Ageing Data (IRB related columns)
    
    F.sum("ageing.CurrentAmount").alias("Total Current Amount"),
    F.avg("ageing.CurrentAmount").alias("Average Current Amount"),
    F.sum("ageing.CurrentUnits").alias("Total Current Units"),
    F.avg("ageing.CurrentUnits").alias("Average Current Units"),

    F.sum("ageing.Sanctioned Load").alias("Total Sanctioned Load"),
    F.avg("ageing.Sanctioned Load").alias("Average Sanctioned Load"),
    F.sum("ageing.Connected Load").alias("Total Connected Load"),
    F.avg("ageing.Connected Load").alias("Average Connected Load"),
    
    F.sum("ageing.IRBAmount").alias("Total IRB Amount"),
    F.avg("ageing.IRBAmount").alias("Average IRB Amount"),
    F.sum("ageing.IRBUnits").alias("Total IRB_Units"),
    F.avg("ageing.IRBUnits").alias("Average IRB Units"),
    # Aggregated values from CRM
    # F.first("crm.Month").alias("Month"),
    # F.first("crm.Year").alias("Year"),
)

# Show the result
# ibc_all_dim_4.limit(1).show()

In [56]:
spark.sql("DROP TABLE nessie.starschema.ibc_all_dim_4")

DataFrame[]

In [57]:
ibc_all_dim_4.writeTo("nessie.starschema.ibc_all_dim_4").createOrReplace()

                                                                                

#### Now IBC OMS x Ageing

In [91]:
ibc_oms_dim = spark.read.table("nessie.starschema.ibc_oms_dim")

In [59]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

# Perform the inner join between ageing_new_data and crm_consumer_dim_ on BillingMonth, Year, and AccountContract
ibc_oms_ageing_dim_4 = ageing_new_data.alias("ageing").join(
    ibc_oms_dim.alias("ibc_oms"), 
    (F.col("ageing.IBCName") == F.col("ibc_oms.IBC")) & #oms does not have ibc codes
    (F.col("ageing.Month") == F.col("ibc_oms.Month")) &
    (F.col("ageing.Year") == F.col("ibc_oms.Year")),
    how='inner'
).groupBy("ageing.IBC", "ageing.Month", "ageing.Year").agg(
    # First value of relevant Ageing columns (from ageing_new_data)
    F.first("ageing.IBCName").alias("IBC Name"),
    F.first("ageing.Region").alias("Region"),
    F.first("ageing.BillingMonth").alias("Billing Month"),
    # Aggregations are already calculated in oms ibc dim
    F.first("ibc_oms.Total Outages/Faults").alias("Total Outages/Faults"),
    F.first("ibc_oms.Most_Occurred_Fault").alias("Most Occurred Fault"),
    F.first("ibc_oms.Most_Occurred_Fault_Frequency").alias("Most Occurred Fault Frequency"),
    F.first("ibc_oms.Average Fault Turn-Around Time (mins)").alias("Average Fault Turn-Around Time (mins)"),
    F.first("ibc_oms.Average Fault Duration (mins)").alias("Average Fault Duration (mins)"),
    F.first("ibc_oms.Rain Frequency").alias("Rain Frequency"),

    # Aggregations from Ageing Data (IRB related columns)
    
    F.sum("ageing.CurrentAmount").alias("Total Current Amount"),
    F.avg("ageing.CurrentAmount").alias("Average Current Amount"),
    F.sum("ageing.CurrentUnits").alias("Total Current Units"),
    F.avg("ageing.CurrentUnits").alias("Average Current Units"),

    F.sum("ageing.Sanctioned Load").alias("Total Sanctioned Load"),
    F.avg("ageing.Sanctioned Load").alias("Average Sanctioned Load"),
    F.sum("ageing.Connected Load").alias("Total Connected Load"),
    F.avg("ageing.Connected Load").alias("Average Connected Load"),
    
    F.sum("ageing.IRBAmount").alias("Total IRB Amount"),
    F.avg("ageing.IRBAmount").alias("Average IRB Amount"),
    F.sum("ageing.IRBUnits").alias("Total IRB_Units"),
    F.avg("ageing.IRBUnits").alias("Average IRB Units"),
    # Aggregated values from CRM
    # F.first("crm.Month").alias("Month"),
    # F.first("crm.Year").alias("Year"),
)

# Show the result
# ibc_oms_ageing_dim_4.limit(5).show(truncate=False)

In [61]:
spark.sql("DROP TABLE nessie.starschema.ibc_oms_ageing_dim_4")

DataFrame[]

In [62]:
ibc_oms_ageing_dim_4.writeTo("nessie.starschema.ibc_oms_ageing_dim_4").createOrReplace()

                                                                                

### PMT CRM Ageing

In [263]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
#for actual use it will be the data that we just read and cleaned
ageing_new_data = df_updated
#TEST ONLY 
# ageing_new_data = spark.read.table("nessie.starschema.ageing_fact")
pmt_crm_dim = spark.read.table("nessie.starschema.pmt_crm_dim")

In [64]:
# 2. Consumer_Dim
# 2. IBC_Dim: Group by IBCCode and other IBC-related fields
from pyspark.sql import functions as F
from pyspark.sql.functions import col

# Perform the inner join between ageing_new_data and crm_consumer_dim_ on BillingMonth, Year, and AccountContract
pmt_crm_ageing_dim = ageing_new_data.alias("ageing").join(
    pmt_crm_dim.alias("pmt_crm"), 
    (F.col("ageing.PMT") == F.col("pmt_crm.PMT")) &
    (F.col("ageing.Month") == F.col("pmt_crm.Month")) &
    (F.col("ageing.Year") == F.col("pmt_crm.Year")),
    how='inner'
).groupBy("ageing.PMT", "ageing.Month", "ageing.Year").agg(
    # First value of relevant Ageing columns (from ageing_new_data)
    F.first("pmt_crm.PMT Name").alias("PMT Name"),    
    F.first("ageing.IBC").alias("IBC Code"),
    F.first("ageing.IBCName").alias("IBC Name"),
    F.first("ageing.Region").alias("Region"),
    F.first("ageing.BillingMonth").alias("Billing Month"),
    # Aggregations are already calculated in oms crm dim
    F.first("pmt_crm.Complaint Counts").alias("Complaint Counts"),
    F.first("pmt_crm.Average Complaint Resolution Time (mins)").alias("Average Complaint Resolution Time (mins)"),

    # Aggregations from Ageing Data (IRB related columns)
    
    F.sum("ageing.CurrentAmount").alias("Total Current Amount"),
    F.avg("ageing.CurrentAmount").alias("Average Current Amount"),
    F.sum("ageing.CurrentUnits").alias("Total Current Units"),
    F.avg("ageing.CurrentUnits").alias("Average Current Units"),

    F.sum("ageing.Sanctioned Load").alias("Total Sanctioned Load"),
    F.avg("ageing.Sanctioned Load").alias("Average Sanctioned Load"),
    F.sum("ageing.Connected Load").alias("Total Connected Load"),
    F.avg("ageing.Connected Load").alias("Average Connected Load"),
    
    F.sum("ageing.IRBAmount").alias("Total IRB Amount"),
    F.avg("ageing.IRBAmount").alias("Average IRB Amount"),
    F.sum("ageing.IRBUnits").alias("Total IRB_Units"),
    F.avg("ageing.IRBUnits").alias("Average IRB Units"),
    # Aggregated values from CRM
    # F.first("crm.Month").alias("Month"),
    # F.first("crm.Year").alias("Year"),
)

# Show the result
# pmt_crm_ageing_dim.limit(1).show()

In [65]:
spark.sql("DROP TABLE nessie.starschema.pmt_crm_ageing_dim")
pmt_crm_ageing_dim.writeTo("nessie.starschema.pmt_crm_ageing_dim").createOrReplace()
# pmt_crm_ageing_dim.writeTo("nessie.starschema.pmt_crm_ageing_dim").append()

                                                                                

In [189]:
spark.stop()

## CRM 

In [190]:
# spark = SparkSession.builder.master("local[*]").config(conf=conf).getOrCreate()
# print("Spark Session Started")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# print("Spark Session Started")
# spark = SparkSession.builder.master("local[*]").config(conf=conf).getOrCreate()
print("Spark Session Started")

Spark Session Started


In [191]:
import re
from pyspark.sql.functions import lit
csv = "/mnt/HabibData/CRM_202301-202501.csv"
# match = re.search(r'_(\d{6})', csv)# Extract month and year from the filename
# billing_month = match.group(1) if match else "Unknown"

# Read the CSV file into a DataFrame
crm_df = spark.read \
    .option("delimiter", "|") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(csv)

                                                                                

In [7]:
crm_df.limit(1).show()

+--------------------+----------+----------------+--------------------+----------+------------+----------+------------------+------------------+------------+--------------------+--------------------+-------+-----------------+------------------+------------------+---------------------------+------------------+------------------+---------------------+--------------------+-------------------+-------+------------+-----------+----------+------+----------+---------+----+-------------------+-----------+----------+------------+----------------+--------------------+-----------------+--------+--------+---------------+----+-----+---------+---------+-----------+----------+-------------------+----------+-----------+-----------------------+----------------+--------+---+---------+-----------+-------------+---------+-------------+-------------+-------------+--------------------+--------------+---------+------+------+-------------+---------+------------+-------------+---------------+-------------+-----

In [192]:
# NULL values
columns_to_remove = [
      "LastDCDate",
    "LastDCRecovery",
    "HTDispatchedDateTime",
    "ForwardedToHTDateTime",
    "Key",
    "LastSIRCreatedOn",
    "Remarks",
    "ClosedOnInteractionDateTime",
    "CompletionRemarks",
    "%IntervalKey",
    "MNCcode",
    "Subject Group",
    "Contract",
    "Contract Account",
    "GroundTATNum"]# fazool hai
    
crm_df_reduced = crm_df.drop(*columns_to_remove)    # Drop the columns from the DataFrame

In [86]:
num_columns = len(crm_df_reduced.columns)
print(f"Number of columns: {num_columns}")

Number of columns: 67


In [193]:
crm_df_reduced = crm_df_reduced.filter(crm_df_reduced.ConsumerStatus == 'ACT')

In [194]:
crm_df_reduced = crm_df_reduced.drop('CustomerStatus')    # Drop the columns from the DataFrame

In [265]:

ageing_new_data.dtypes

[('AccountContract', 'string'),
 ('Business Partner', 'string'),
 ('Consumer Number', 'string'),
 ('Contract Account', 'bigint'),
 ('Billing Class', 'string'),
 ('Rate Category', 'string'),
 ('Region', 'string'),
 ('IBC', 'string'),
 ('IBCName', 'string'),
 ('Postal Code', 'string'),
 ('OIP', 'string'),
 ('Phase', 'string'),
 ('Cycle Day', 'string'),
 ('MRU', 'string'),
 ('Sanctioned Load', 'double'),
 ('Connected Load', 'double'),
 ('Last DC Date', 'string'),
 ('Last DC Reason', 'int'),
 ('Premise Type', 'string'),
 ('Customer Name', 'string'),
 ('PMT', 'int'),
 ('PSC Consumer Region', 'string'),
 ('Strategic/Non-Strategic', 'string'),
 ('Consumer Type', 'string'),
 ('Industry Classification', 'string'),
 ('Last Payment Date', 'timestamp'),
 ('Last Payment Amount', 'double'),
 ('Meter Number', 'string'),
 ('Meter Make', 'string'),
 ('Device Category', 'string'),
 ('BillingMonth', 'date'),
 ('BillType', 'string'),
 ('BCM', 'string'),
 ('NormalUnits', 'double'),
 ('NormalAmount', 'doubl

In [90]:
# df_reduced.count()
crm_df.dtypes

[('AccountContract', 'decimal(20,0)'),
 ('Ticket No.', 'bigint'),
 ('Business_Partner', 'string'),
 ('GUID', 'string'),
 ('IBCName', 'string'),
 ('PROCESS_TYPE', 'string'),
 ('CREATED_BY', 'string'),
 ('CREATED_AT', 'string'),
 ('UDATETIME', 'string'),
 ('LatestStatus', 'string'),
 ('Supervisor', 'string'),
 ('Lineman', 'string'),
 ('MTLNo', 'string'),
 ('CompletionRemarks', 'string'),
 ('CompletedDateTime', 'string'),
 ('ClosedDateTime', 'string'),
 ('ClosedOnInteractionDateTime', 'string'),
 ('InProcessDateTime', 'string'),
 ('OpenDateTime', 'string'),
 ('ForwardedToHTDateTime', 'string'),
 ('HTDispatchedDateTime', 'string'),
 ('Medium Of Complaint', 'string'),
 ('IBCCode', 'string'),
 ('CREATED_DATE', 'string'),
 ('Source', 'string'),
 ('Created By', 'string'),
 ('Region', 'string'),
 ('Subject_ID', 'string'),
 ('Reason_ID', 'string'),
 ('Key', 'string'),
 ('Subject Group', 'string'),
 ('Reason', 'string'),
 ('Subject', 'string'),
 ('GroundTATHrs', 'string'),
 ('GroundTATNum', 'doub

In [195]:
crm_df.select('GroundTATNum').limit(5).show()

+-----------------+
|     GroundTATNum|
+-----------------+
| 0.13442129629402|
|0.034247685187438|
| 0.32202546296321|
| 0.76576388889225|
|  1.2027893518534|
+-----------------+



In [196]:
crm_df_reduced = crm_df_reduced.withColumn("AccountContract", F.col("AccountContract").cast("string"))

### dividing date and time

In [197]:
from pyspark.sql.functions import to_date, to_timestamp, col, date_format, lit, concat

In [198]:
for i in [  "UDATETIME",
    
    "CompletedDateTime",
"ClosedDateTime",
 "InProcessDateTime",
"OpenDateTime",
"CREATED_AT"
         ]:
    # Apply the function to split the datetime
    crm_df_reduced = crm_df_reduced.withColumn(i, to_timestamp(col(i), 'dd-MMM-yy HH:mm:ss'))


# Show the updated dataframe
crm_df_reduced.limit(1).show()

+--------------------+----------+----------------+--------------------+----------+------------+----------+-------------------+-------------------+------------+--------------------+--------------------+-------+-------------------+-------------------+-------------------+-------------------+-------------------+-------+------------+-----------+----------+------+----------+---------+-----------+----------+------------+--------------------+-----------------+--------+--------+---------------+----+-----+---------+---------+-----------+----------+----------+-----------+-----------------------+---+-----------+-------------+---------+-------------+-------------+-------------+--------------------+--------------+---------+------+------+-------------+---------+------------+-------------+---------------+-------------+------------+---------+---------+-------------------+-----------------+---------------+----------+
|     AccountContract|Ticket No.|Business_Partner|                GUID|   IBCName|PROCE

In [199]:
datesinMonthformat = ["CREATED_DATE"]
for date in datesinMonthformat:
    crm_df_reduced = crm_df_reduced.withColumn(date, to_timestamp(col(date), 'dd-MMM-yy'))
    print(crm_df_reduced.schema[date].dataType)

TimestampType()


In [200]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
#CA Creation Date (MM/DD/YYYY), Contract Creation Date(DD/MM/YYYY), Last DC Date (DD-mon-YYYY) (same as last payment)
crm_df_reduced.filter(crm_df_reduced["CREATED_DATE"].isNotNull()).select("CREATED_DATE").limit(10).show()

+-------------------+
|       CREATED_DATE|
+-------------------+
|2024-04-18 00:00:00|
|2024-07-17 00:00:00|
|2024-08-21 00:00:00|
|2023-08-21 00:00:00|
|2023-06-22 00:00:00|
|2023-06-22 00:00:00|
|2024-05-02 00:00:00|
|2023-08-21 00:00:00|
|2024-05-16 00:00:00|
|2023-04-06 00:00:00|
+-------------------+



In [201]:
# getting rid of invalid entries before updating casting dates and duration hours 

In [202]:
#strings that contain both date and time

In [203]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

In [204]:
# Define the function to convert hh:mm:ss to minutes
def hhmmss_to_minutes(time_str):
    try:
        if type(time_str)!=str:
            return time_str
        h, m, s = map(int, time_str.split(":"))
        return h * 60 + m + s / 60
    except Exception as e:
        return None  # In case of an error (e.g., non-time values)
# Register the function as a UDF (User Defined Function)
hhmmss_to_minutes_udf = udf(hhmmss_to_minutes, FloatType())

In [205]:
hrstomins = ["GroundTATHrs", "TimeElapsedCompleted", "TimeElapsedClosed","O to I","I to C"]
for j in hrstomins:
    crm_df_reduced = crm_df_reduced.withColumn(j + "(minutes)", hhmmss_to_minutes_udf(col(j)))
    # df_reduced = df_reduced.withColumn(i, to_timestamp(col(i), 'dd-MMM-yy HH:mm:ss'))
columns_to_show = [col + "(minutes)" for col in hrstomins]
crm_df_reduced.select(columns_to_show).limit(5).show()

[Stage 5:>                                                          (0 + 1) / 1]

+---------------------+-----------------------------+--------------------------+---------------+---------------+
|GroundTATHrs(minutes)|TimeElapsedCompleted(minutes)|TimeElapsedClosed(minutes)|O to I(minutes)|I to C(minutes)|
+---------------------+-----------------------------+--------------------------+---------------+---------------+
|            193.56667|                    193.56667|                 793.63336|       91.21667|         101.45|
|            49.316666|                    49.316666|                  529.4167|      11.916667|           37.4|
|                 89.7|                         89.7|                 689.76666|      13.566667|       74.78333|
|            116.86667|                    116.86667|                    766.45|      14.366667|     100.916664|
|                 91.7|                         91.7|                    2025.5|       4.116667|      85.583336|
+---------------------+-----------------------------+--------------------------+---------------+

                                                                                

In [206]:
crm_df_reduced = crm_df_reduced.drop(*hrstomins)    # Drop the columns from the DataFrame

## cleaning before handling target TAT because its string 

In [103]:
crm_df_reduced = replace_invalid_strings_with_null(crm_df_reduced)

In [104]:
def convert_column_to_double(df, column_name):
    # Convert the specified column to double (float)
    df_updated = df.withColumn(column_name, col(column_name).cast("double"))
    return df_updated

In [215]:
crm_df_reduced = convert_column_to_double(crm_df_reduced, "TargetTAT")

In [216]:
print(crm_df_reduced.schema["TargetTAT"].dataType)

DoubleType()


In [217]:
doubletohours = [
"GroundTAT",
"TargetTAT"]
# removed "GroundTATNum",

In [218]:

# Function to convert a double (hours) to minutes
def hours_to_minutes(hours):
    try:
        return hours * 60
    except Exception as e:
        return None  # In case of any invalid value

# Register the function as a UDF (User Defined Function)
hours_to_minutes_udf = udf(hours_to_minutes, FloatType())

In [220]:
for k in doubletohours:
    crm_df_reduced = crm_df_reduced.withColumn(k + "(minutes)", hours_to_minutes_udf(col(k)))

# Show the updated dataframe with the new converted column
columns_to_show = [col + "(minutes)" for col in doubletohours]
crm_df_reduced.select(columns_to_show).limit(5).show()

+------------------+------------------+
|GroundTAT(minutes)|TargetTAT(minutes)|
+------------------+------------------+
|             193.8|             255.0|
|              49.2|             300.0|
|              89.4|              NULL|
|             117.0|              NULL|
|              91.8|              NULL|
+------------------+------------------+



In [221]:
crm_df_reduced.filter(crm_df_reduced["TargetTAT"].isNotNull()).select("TargetTAT").limit(5).show()

+---------+
|TargetTAT|
+---------+
|     4.25|
|      5.0|
|     2.75|
|      3.5|
|      5.0|
+---------+



In [111]:
crm_df_reduced = crm_df_reduced.drop(*doubletohours)    # Drop the columns from the DataFrame

In [112]:
# "Month" #alpha to num

In [113]:
month_mapping = {
    'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
    'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
}
def month_name_to_number(month_name):
    try:
        return month_mapping.get(month_name, None)  # Return None if the month is invalid
    except Exception as e:
        return None
month_name_to_number_udf = udf(month_name_to_number, IntegerType())

In [114]:
crm_df_reduced = crm_df_reduced.withColumn('Month', month_name_to_number_udf(col('Month')))

In [115]:
crm_df_reduced.select('Month').limit(5).show()

+-----+
|Month|
+-----+
|    4|
|    7|
|    8|
|    8|
|    6|
+-----+



In [116]:
# "DueDate", "IssueDate" "DD/MM/YYYY"

In [117]:
datesindiffMonthformat = ["DueDate","IssueDate"]
for date in datesindiffMonthformat:
    crm_df_reduced = crm_df_reduced.withColumn(date, to_timestamp(col(date), 'dd/MM/yy'))
    print(crm_df_reduced.schema[date].dataType)

TimestampType()
TimestampType()


In [118]:
columns_to_show = [col for col in datesindiffMonthformat]
crm_df_reduced.filter(crm_df_reduced["IssueDate"].isNotNull()).select("IssueDate").limit(5).show()

+-------------------+
|          IssueDate|
+-------------------+
|2019-08-05 00:00:00|
|2019-07-05 00:00:00|
|2019-09-05 00:00:00|
|2019-08-05 00:00:00|
|2019-08-05 00:00:00|
+-------------------+



In [119]:

from pyspark.sql.functions import regexp_replace, col

def remove_commas_and_convert_to_numerical(df, column_name):
    # Remove commas from the specified column and convert the result to float
    df_updated = df.withColumn(column_name, regexp_replace(col(column_name), ",", "").cast("double"))
    return df_updated


In [120]:
crm_df_reduced.filter(crm_df_reduced["UnitBilled"].isNotNull()).select("UnitBilled").limit(5).show()

+----------+
|UnitBilled|
+----------+
|       280|
|       422|
|       154|
|       250|
|       495|
+----------+



In [121]:
from pyspark.sql.functions import col, regexp_replace

numericalcolumnstofix = ["UnitBilled", "Last Payment Amount"]
for r in numericalcolumnstofix:
    crm_df_reduced = remove_commas_and_convert_to_numerical(crm_df_reduced, r)
    print(crm_df_reduced.schema[r].dataType)


DoubleType()
DoubleType()


In [122]:
crm_df_reduced.filter(crm_df_reduced["Last Payment Amount"].isNotNull()).select("Last Payment Amount", "AccountContract").limit(5).show()

+-------------------+--------------------+
|Last Payment Amount|     AccountContract|
+-------------------+--------------------+
|             1500.0|40002933016432866064|
|             6878.0|40002709964332629117|
|             4149.0|40003394143733313978|
|             1802.0|40002814258532732331|
|             1413.0|40002905226132829687|
+-------------------+--------------------+



## Transformation

In [207]:
from pyspark.sql import functions as F

In [79]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.starschema").show()

++
||
++
++



In [463]:
crm_df_reduced = crm_df_reduced.repartition(200)

In [48]:
crm_df_reduced.writeTo("nessie.starschema.crm_fact").createOrReplace()

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  (11 + 1) / 12]
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
                                                                                

In [None]:
# drop Customer Status from crm.fact

In [None]:
# crm
# Total complaints
# Most Occured Subject 

# Average Complaint Resolution Time (hrs)
# oms
# Average Fault Duration Time (hrs)

In [9]:
#TEST ONLY 
crm_df_reduced = spark.read.table("nessie.starschema.crm_fact")


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


In [10]:
# df_reduced.count()
crm_df_reduced.dtypes

[('AccountContract', 'decimal(20,0)'),
 ('Ticket No.', 'bigint'),
 ('Business_Partner', 'string'),
 ('GUID', 'string'),
 ('IBCName', 'string'),
 ('PROCESS_TYPE', 'string'),
 ('CREATED_BY', 'string'),
 ('CREATED_AT', 'timestamp'),
 ('UDATETIME', 'timestamp'),
 ('LatestStatus', 'string'),
 ('Supervisor', 'string'),
 ('Lineman', 'string'),
 ('MTLNo', 'string'),
 ('CompletedDateTime', 'timestamp'),
 ('ClosedDateTime', 'timestamp'),
 ('InProcessDateTime', 'timestamp'),
 ('OpenDateTime', 'timestamp'),
 ('Medium Of Complaint', 'string'),
 ('IBCCode', 'string'),
 ('CREATED_DATE', 'timestamp'),
 ('Source', 'string'),
 ('Created By', 'string'),
 ('Region', 'string'),
 ('Subject_ID', 'string'),
 ('Reason_ID', 'string'),
 ('Reason', 'string'),
 ('Subject', 'string'),
 ('ReOpenCountTemp', 'int'),
 ('Year', 'int'),
 ('Month', 'int'),
 ('ReOpenCount', 'int'),
 ('Closed Tag', 'string'),
 ('TATBusted', 'string'),
 ('SDR/Not SDR', 'string'),
 ('TimeElapsedAtLastReload', 'double'),
 ('OIP', 'string'),
 (

In [14]:
# 2. Consumer_Dim
crm_consumer_dim_ = crm_df_reduced.groupBy("AccountContract", "Month", "Year").agg(
    F.count("AccountContract").alias("Complaint Counts (Apr-Sep_23-24)"),  # Count number of records per AccountContract
    F.avg("GroundTAT(minutes)").alias("Average Complaint Resolution Time (mins)"), F.first("StarCustomer").alias("StarCustomer")
)
crm_consumer_dim_.limit(5).show()



+---------------+-----+----+--------------------------------+----------------------------------------+------------+
|AccountContract|Month|Year|Complaint Counts (Apr-Sep_23-24)|Average Complaint Resolution Time (mins)|StarCustomer|
+---------------+-----+----+--------------------------------+----------------------------------------+------------+
|           NULL|    5|2023|                               0|                      197.22461641751804|        NULL|
|           NULL|    8|2023|                               0|                      147.53877491853675|        NULL|
|           NULL|    8|2024|                               0|                      367.84687323868275|        NULL|
|           NULL|    9|2023|                               0|                       193.8000005086263|        NULL|
|   400000646456|    8|2023|                               1|                                   342.0|      Others|
+---------------+-----+----+--------------------------------+-----------

                                                                                

In [15]:
# spark.sql("DROP TABLE nessie.starschema.crm_consumer_dim").show()

++
||
++
++



In [16]:
crm_consumer_dim_.writeTo("nessie.starschema.crm_consumer_dim").createOrReplace()

                                                                                

In [208]:
#TEST ONLY 
ageing_new_data = spark.read.table("nessie.starschema.ageing_fact")

In [None]:
ageing_new_data.dtypes

In [32]:
consumer_ageing_crm_dim.writeTo("nessie.starschema.consumer_ageing_crm_dim").createOrReplace()
# would be
# consumer_ageing_crm_dim.writeTo("nessie.starschema.consumer_ageing_crm_dim").append()

                                                                                

### IBC

In [224]:
#TEST ONLY 
crm_df_reduced = spark.read.table("nessie.starschema.crm_fact")

In [223]:
from pyspark.sql import functions as F

# Grouping and aggregating the IBC data
crm_ibc_dim = crm_df_reduced.groupBy("IBCName", "IBCCode","Year", "Month").agg(
    F.count("IBCName").alias("Complaint Counts"),  # Count the occurrences of IBCName
    F.avg("GroundTAT(minutes)").alias("Average Complaint Resolution Time (mins)")
)


In [107]:
crm_ibc_dim = crm_ibc_dim.repartition(100)

In [108]:
# 1. OMS fact
crm_ibc_dim = crm_ibc_dim.filter(
    (col("Month").isNotNull()) & (col("Year").isNotNull() & (col("IBCName").isNotNull()))
)

In [109]:
spark.sql("DROP TABLE IF EXISTS nessie.starschema.ibc_crm_dim")

DataFrame[]

In [110]:
crm_ibc_dim.writeTo("nessie.starschema.ibc_crm_dim").createOrReplace()

                                                                                

#### CRM PMT dim 

In [123]:

pmt_crm_dim = crm_df_reduced.groupBy("PMT", "PMT Name","Year", "Month").agg(
    F.count("IBCName").alias("Complaint Counts"),  # Count the occurrences of IBCName
    F.avg("GroundTAT(minutes)").alias("Average Complaint Resolution Time (mins)")
)

In [124]:
spark.sql("DROP TABLE IF EXISTS nessie.starschema.pmt_dim")

DataFrame[]

In [126]:
crm_pmt_dim.writeTo("nessie.starschema.pmt_crm_dim").createOrReplace()

                                                                                

### 

#### Date

In [248]:
from pyspark.sql import functions as F

# Group by the date part of CREATED_DATE
crm_date_dim = crm_df_reduced.groupBy(F.to_date("CREATED_DATE").alias("Date"),"IBCCode","IBCName").agg(
    F.count("CREATED_DATE").alias("Complaint Counts"),
    F.avg("GroundTAT(minutes)").alias("Average Complaint Resolution Time (mins)")
)


In [249]:
crm_date_dim.limit(1).show()



+----------+-------+-------+----------------+----------------------------------------+
|      Date|IBCCode|IBCName|Complaint Counts|Average Complaint Resolution Time (mins)|
+----------+-------+-------+----------------+----------------------------------------+
|2023-08-14|    138| Garden|             312|                      177.06153851289017|
+----------+-------+-------+----------------+----------------------------------------+



                                                                                

In [250]:
spark.sql("DROP TABLE IF EXISTS nessie.starschema.crm_date_dim")
crm_date_dim.writeTo("nessie.starschema.crm_date_dim").createOrReplace()

                                                                                

#### Feeder

In [271]:
from pyspark.sql import functions as F

# Group by the date part of CREATED_DATE
crm_feeder_dim = crm_df_reduced.groupBy(F.to_date("CREATED_DATE").alias("Date"), "Feeder ID", "Feeder Name").agg(
    F.count("Feeder ID").alias("Complaint Counts"),
    F.avg("GroundTAT(minutes)").alias("Average Complaint Resolution Time (mins)")
)
crm_feeder_dim.writeTo("nessie.starschema.crm_feeder_dim").createOrReplace()

                                                                                

In [60]:
spark.stop()

## OMS

### Cleaning

In [35]:
spark = SparkSession.builder.master("local[*]").config(conf=conf).getOrCreate()
print("Spark Session Started")

Spark Session Started


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 56816)
Traceback (most recent call last):
  File "/usr/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/lib/python3.10/dist-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
  File "/usr/local/lib/python3.10/dist-packages/pyspark/accumulators.py", line 271, in accum_updates
    num_updates = read_int(

In [5]:
# spark.stop()

In [6]:
from pyspark.sql.functions import when, col

In [7]:
import re
from pyspark.sql.functions import lit
from pyspark.sql.functions import when, col
csv = "/mnt/HabibData/OMSRequestedData.csv"
# match = re.search(r'_(\d{6})', csv)# Extract month and year from the filename
# billing_month = match.group(1) if match else "Unknown"

# Read the CSV file into a DataFrame
oms_df = spark.read \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(csv)

                                                                                

In [8]:
oms_df.select("Createdon").limit(10).show()

+-------------+
|    Createdon|
+-------------+
|1/1/2023 0:09|
|1/1/2023 1:21|
|1/1/2023 1:21|
|1/1/2023 1:46|
|1/1/2023 2:34|
|         NULL|
|           LL|
|1/1/2023 5:34|
|1/1/2023 5:35|
|1/1/2023 7:14|
+-------------+



In [9]:
# NULL values
columns_to_remove = ["pt_name","load", "Relay", "planned_sd_Id","faultstr", 
"consumer_count_fdr", "saifi", "saidi", "_c36","_c37", "_c38", "_c39","_c40", "_c41", "_c42", "_c43", "_c44"]

In [10]:
# # Filter the DataFrame for outage_Id == 786900
# punching_delay = oms_df.filter(oms_df.outage_Id == '786900').select("PunchingDelay")

# # Show the result
# punching_delay.show(truncate=False)


In [11]:
oms_df_reduced = oms_df.drop(*columns_to_remove)    # Drop the columns from the DataFrame
num_columns = len(oms_df_reduced.columns)
print(f"Number of columns: {num_columns}")

Number of columns: 28


In [12]:
oms_df_reduced.count()

88487

In [13]:
from pyspark.sql import functions as F

# Filter for rows where "TAT" is either a number (matching the regex) or null
oms_df_reduced = oms_df_reduced.filter(
    (F.col("TAT").rlike("^[0-9]+$")) | (F.col("TAT").isNull())
)


oms_df_reduced = oms_df_reduced.withColumn(
    "TAT",(col("TAT").cast("double"))
)


In [14]:
oms_df_reduced.count()

                                                                                

82956

In [15]:
oms_df_reduced.dtypes

[('outage_Id', 'string'),
 ('NetworkLevel', 'string'),
 ('OutageType', 'string'),
 ('outageSubType', 'string'),
 ('htclosingtype', 'string'),
 ('htclosingsubtype', 'string'),
 ('htclosingreason', 'string'),
 ('initialoffreason', 'string'),
 ('grid_name', 'string'),
 ('fdr_Id', 'string'),
 ('fdr_name', 'string'),
 ('dts_Id', 'string'),
 ('dts_name', 'string'),
 ('cluster', 'string'),
 ('IBC', 'string'),
 ('TAT', 'double'),
 ('PunchingDelay', 'string'),
 ('OutageStatus', 'string'),
 ('Createdon', 'string'),
 ('punchCreateAt', 'string'),
 ('punchCloseAt', 'string'),
 ('duration', 'string'),
 ('isemergency', 'string'),
 ('israintripping', 'string'),
 ('cluster_Id', 'string'),
 ('closing_remarks', 'string'),
 ('loss_category', 'string'),
 ('lastupdatedon', 'string')]

In [16]:
oms_df_reduced = oms_df_reduced.repartition(100)  # or less/more based on your cluster size

In [25]:
oms_df_reduced = replace_invalid_strings_with_null(oms_df_reduced)

In [26]:
oms_df_reduced.count()

82956

In [27]:
oms_df_reduced.select("PunchingDelay").limit(150).show()
# oms_df_reduced.filter(oms_df_reduced["lastupdatedon"].isNotNull()).select("lastupdatedon").limit(5).show()

[Stage 21:>                                                         (0 + 6) / 6]

+-------------+
|PunchingDelay|
+-------------+
|            9|
|           12|
|            4|
|           40|
|         NULL|
|         NULL|
|         NULL|
|         NULL|
|         NULL|
|         NULL|
|         NULL|
|         NULL|
|         NULL|
|         NULL|
|         NULL|
|           11|
|            6|
|            8|
|            3|
|            3|
+-------------+
only showing top 20 rows



                                                                                

In [28]:
from pyspark.sql.functions import when, col, to_timestamp
from pyspark.sql import functions as F
from pyspark.sql.functions import to_timestamp, col, when


# # Clean non-datetime rows, e.g., "LL" or empty
# oms_df_reduced = oms_df_reduced.withColumn(
#     "lastupdatedon",
#     when(col("lastupdatedon").rlike("^[0-9]{1,2}/[0-9]{1,2}/[0-9]{4} [0-9]{1,2}:[0-9]{2}$"), 
#          to_timestamp(col("lastupdatedon"), "M/d/yy h:mm"))
#     .otherwise(None)  # Will set to null for invalid formats
# )

# Repeat for other columns if needed
datetime_cols = ["punchCreateAt", "punchCloseAt", "Createdon","lastupdatedon","lastupdatedon"]

# Loop through datetime columns
for col_name in datetime_cols:
    oms_df_reduced = oms_df_reduced.withColumn(
        col_name,
    when(
        col(col_name).rlike(r"^\d{1,2}/\d{1,2}/\d{4} \d{1,2}:\d{2}$"),  # M/d/yyyy h:mm or MM/dd/yyyy HH:mm
        to_timestamp(col(col_name), "M/d/yyyy H:mm")
    )
    .when(
        col(col_name).rlike(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$"),  # yyyy-MM-dd HH:mm:ss
        to_timestamp(col(col_name), "yyyy-MM-dd HH:mm:ss")
    )
        # when(
        #     col(col_name).rlike("^[0-9]{1,2}/[0-9]{1,2}/[0-9]{4} [0-9]{1,2}:[0-9]{2}$"),  # M/d/yyyy h:mm format
        #     to_timestamp(col(col_name), "M/d/yyyy h:mm")
        # )
        # .when(
        #     col(col_name).rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$"),  # MM/d/yyyy h:mm format
        #     to_timestamp(col(col_name), "yyyy-MM-dd HH:mm:ss")
        # )
        # .when(
        #     col(col_name).rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$"),  # M/dd/yyyy h:mm format
        #     to_timestamp(col(col_name), "yyyy-MM-dd HH:mm:ss")
        # )
        # .when(
        #     col(col_name).rlike("^[0-9]{1,2}/[0-9]{1,2}/[0-9]{4} [0-9]{1,2}:[0-9]{2}$"),  # MM/dd/yyyy h:mm format
        #     to_timestamp(col(col_name), "M/d/yyyy h:mm")
        # )
        # .when(
        #     col(col_name).rlike("^[0-9]{1,2}/[0-9]{1,2}/[0-9]{4} [0-9]{1,2}:[0-9]{2}$"),  # M/d/yyyy HH:mm format
        #     to_timestamp(col(col_name), "M/d/yyyy h:mm")
        # )
        # .when(
        #     col(col_name).rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$"),  # MM/d/yyyy HH:mm format
        #     to_timestamp(col(col_name), "yyyy-MM-dd HH:mm:ss")
        # )
        # .when(
        #     col(col_name).rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$"),  # M/dd/yyyy HH:mm format
        #     to_timestamp(col(col_name), "yyyy-MM-dd HH:mm:ss")
        # )
        # .when(
        #     col(col_name).rlike("^[0-9]{1,2}/[0-9]{1,2}/[0-9]{4} [0-9]{1,2}:[0-9]{2}$"),  # MM/dd/yyyy HH:mm format
        #     to_timestamp(col(col_name), "M/d/yyyy h:mm")
        # )
        .otherwise(None)
    )

# Show the updated DataFrame
oms_df_reduced.show(truncate=False)


+---------+------------+----------+-------------+-------------------+--------------------------------------------+---------------------------------+-----------------+----------+------+-----------------------------+------+----------------------------------------+-------+-----------+------+-------------+------------+-------------------+-------------------+-------------------+--------+-----------+--------------+----------+-------------------------------------------------------------------------------------------------+-------------+-------------+
|outage_Id|NetworkLevel|OutageType|outageSubType|htclosingtype      |htclosingsubtype                            |htclosingreason                  |initialoffreason |grid_name |fdr_Id|fdr_name                     |dts_Id|dts_name                                |cluster|IBC        |TAT   |PunchingDelay|OutageStatus|Createdon          |punchCreateAt      |punchCloseAt       |duration|isemergency|israintripping|cluster_Id|closing_remarks            

In [29]:
oms_df_reduced.count()

82956

In [30]:
from pyspark.sql.functions import col,when


# Keep only rows where PunchingDelay is NULL or fully numeric
oms_df_reduced = oms_df_reduced.filter(
    col("PunchingDelay").rlike("^[0-9]+$") | col("PunchingDelay").isNull()
)

oms_df_reduced = oms_df_reduced.withColumn(
    "PunchingDelay",(col("PunchingDelay").cast("double"))
)


In [79]:
#homogenizing name because oms does not have ibc code for join
from pyspark.sql import functions as F

# Update the 'IBC' column to replace the specific strings
oms_df_reduced = oms_df_reduced.withColumn(
    "IBC", 
    F.when(F.col("IBC") == "Liaquatabad", "Liaqatabad")
     .when(F.col("IBC") == "FB Area", "F.B. Area")
     .otherwise(F.col("IBC"))  # Keep other values unchanged
)

# Show the updated DataFrame
oms_df_reduced.show(truncate=False)




+---------+------------+----------+-------------+-------------------+--------------------------------------------+---------------------------------+-----------------+----------+------+-----------------------------+------+----------------------------------------+-------+----------+------+-------------+------------+-------------------+-------------------+-------------------+------------------+-----------+--------------+----------+-------------------------------------------------------------------------------------------------+-------------+-------------+----+-----+
|outage_Id|NetworkLevel|OutageType|outageSubType|htclosingtype      |htclosingsubtype                            |htclosingreason                  |initialoffreason |grid_name |fdr_Id|fdr_name                     |dts_Id|dts_name                                |cluster|IBC       |TAT   |PunchingDelay|OutageStatus|Createdon          |punchCreateAt      |punchCloseAt       |duration          |isemergency|israintripping|cluster_I

                                                                                

In [80]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
#CA Creation Date (MM/DD/YYYY), Contract Creation Date(DD/MM/YYYY), Last DC Date (DD-mon-YYYY) (same as last payment)
oms_df_reduced.filter(oms_df_reduced["Createdon"].isNotNull()).select("Createdon").count()

77385

In [81]:
#convert duration to minutes hh:mm:ss
oms_df_reduced = oms_df_reduced.withColumn("duration", hhmmss_to_minutes_udf(col("duration"))) 
oms_df_reduced.select("duration").limit(5).show()
oms_df_reduced = convert_column_to_double(oms_df_reduced,"duration")

+----------+
|  duration|
+----------+
| 55.366665|
| 38.516666|
|  837.9667|
| 65.433334|
|107.166664|
+----------+



In [82]:

oms_df_reduced.select("duration").limit(5).show()

+------------------+
|          duration|
+------------------+
| 55.36666488647461|
|38.516666412353516|
| 837.9666748046875|
| 65.43333435058594|
|107.16666412353516|
+------------------+



In [83]:
from pyspark.sql.functions import col, when

# # Filter out rows where 'isemergency' is numeric, null, or contains only whitespace
# check = df_reduced.filter(
#     (col("isemergency").rlike("^[0-9]+$")) | 
#     (col("isemergency").isNull()) | 
#     (col("isemergency").rlike("^\\s*$"))  # matches whitespace-only strings
# )

# Replace invalid values (including whitespace) with 0, then cast to int
oms_df_reduced = oms_df_reduced.withColumn(
    "isemergency",
    when(col("isemergency").isNull(), 0)
    .when(col("isemergency").rlike("^\\s*$"), 0)  # Replace whitespace with 0
    .otherwise(col("isemergency").cast("int"))
)

In [84]:
oms_df_reduced.count()

82951

In [85]:
oms_df_reduced = oms_df_reduced.withColumn("israintripping", when(col("israintripping") == "YES", 1).otherwise(0))

In [86]:
# df_reduced.limit(10).show()
# df_reduced.show()  # First 20 rows by default
# df.show(50)  # Show 50 rows
oms_df_reduced.show(truncate=False)  # Don't cut off long strings




+---------+------------+----------+-------------+-------------------+------------------------------+---------------+-------------------+------------+------+-------------------------------------+------+----------------------------------------+-------+----------+--------+-------------+------------+-------------------+-------------------+-------------------+------------------+-----------+--------------+----------+----------------------------------------------------------------------+-------------+-------------+----+-----+
|outage_Id|NetworkLevel|OutageType|outageSubType|htclosingtype      |htclosingsubtype              |htclosingreason|initialoffreason   |grid_name   |fdr_Id|fdr_name                             |dts_Id|dts_name                                |cluster|IBC       |TAT     |PunchingDelay|OutageStatus|Createdon          |punchCreateAt      |punchCloseAt       |duration          |isemergency|israintripping|cluster_Id|closing_remarks                                              

                                                                                

### Transformation

In [237]:
from pyspark.sql.functions import col, to_date, year, month, sum as _sum
from pyspark.sql import functions as F
from pyspark.sql.functions import col


In [88]:
oms_df_reduced.count()

82951

In [89]:
spark.sql("DROP table nessie.starschema.oms_fact").show()


++
||
++
++



In [90]:
# 1. OMS fact
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.starschema")
oms_df_reduced.writeTo("nessie.starschema.oms_fact").createOrReplace()

                                                                                

In [273]:
#TEST ONLY 
oms_df_reduced = spark.read.table("nessie.starschema.oms_fact")

### IBC

In [91]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Extract Year and Month from the CreatedOn column
oms_df_reduced = oms_df_reduced.withColumn("Year", F.year("CreatedOn")) \
    .withColumn("Month", F.month("CreatedOn"))

# Group by IBC, Year, Month and calculate frequency of each fault (initialoffreason)
most_frequent_fault = oms_df_reduced.groupBy("IBC", "Year", "Month", "initialoffreason").agg(
    F.count("initialoffreason").alias("Most_Occurred_Fault_Frequency")
)

# Now, for each IBC, Year, and Month, we want the fault with the highest count
most_frequent_fault = most_frequent_fault.withColumn(
    "rank", F.row_number().over(
        Window.partitionBy("IBC", "Year", "Month").orderBy(F.desc("Most_Occurred_Fault_Frequency"))
    )
)

# Filter to get only the most frequent fault for each IBC, Year, and Month
most_frequent_fault = most_frequent_fault.filter(F.col("rank") == 1).drop("rank")

# # Rename IBC to IBC_Fault for clarity
# most_frequent_fault = most_frequent_fault.withColumnRenamed("IBC", "IBC_Fault")
# most_frequent_fault = most_frequent_fault.withColumnRenamed("Month", "Month_Fault")
# most_frequent_fault = most_frequent_fault.withColumnRenamed("Year", "Year_Fault")

# Now, we will aggregate the other fields for the IBC and join with the most frequent fault
oms_ibc_dim = oms_df_reduced.groupBy("IBC", "Year", "Month").agg(
    F.avg("TAT").alias("Average Fault Turn-Around Time (mins)"),
    F.avg("duration").alias("Average Fault Duration (mins)"),
    F.count("outage_Id").alias("Total Outages/Faults"),
    F.sum("israintripping").alias("Rain Frequency")
)

# Join the most frequent fault with the rest of the aggregation
oms_ibc_dim_j = oms_ibc_dim.join(most_frequent_fault, 
                       on=["IBC","Year","Month"],
                       how="left")

# # Rename columns to avoid ambiguity before the final select
# oms_ibc_dim = oms_ibc_dim.withColumnRenamed("Year", "oms_Year") \
#                          .withColumnRenamed("Month", "oms_Month")

# most_frequent_fault = most_frequent_fault.withColumnRenamed("Year", "fault_Year") \
#                                            .withColumnRenamed("Month", "fault_Month")

# Final selection of columns, including the most frequent fault
oms_ibc_dim_j = oms_ibc_dim_j.select(
    # Rename columns for clarity
    F.col("Month").alias("Month"),
    F.col("Year").alias("Year"),
    "IBC", 
    "Average Fault Turn-Around Time (mins)", 
    "Average Fault Duration (mins)", 
    "Total Outages/Faults", 
    "Rain Frequency", 
    F.col("initialoffreason").alias("Most_Occurred_Fault"), 
    "Most_Occurred_Fault_Frequency"
)

# Show the result
oms_ibc_dim_j.show(truncate=False)




+-----+----+----------+-------------------------------------+-----------------------------+--------------------+--------------+-------------------+-----------------------------+
|Month|Year|IBC       |Average Fault Turn-Around Time (mins)|Average Fault Duration (mins)|Total Outages/Faults|Rain Frequency|Most_Occurred_Fault|Most_Occurred_Fault_Frequency|
+-----+----+----------+-------------------------------------+-----------------------------+--------------------+--------------+-------------------+-----------------------------+
|3    |2024|F.B. Area |12548.455188679245                   |192.87223607655298           |424                 |0             |Job of Operation   |182                          |
|2    |2023|Gulshan   |17831.50980392157                    |147.82355213657743           |357                 |0             |Shutdown           |88                           |
|6    |2023|Gulshan   |17547.137339055793                   |130.56513362291955           |699                

                                                                                

In [64]:
oms_ibc_dim_j.select("Year","Month").limit(5).show()



+----+-----+
|Year|Month|
+----+-----+
|2024|    3|
|2023|    2|
|2023|    2|
|2023|    6|
|2024|    1|
+----+-----+



                                                                                

In [92]:
oms_ibc_dim_j.count()

                                                                                

105

In [93]:
# 1. OMS fact
oms_ibc_dim_j = oms_ibc_dim_j.filter(
    (col("Month").isNotNull()) & (col("Year").isNotNull())
)

In [94]:
spark.sql("DROP TABLE nessie.starschema.ibc_oms_dim")

DataFrame[]

In [95]:

spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.starschema")
oms_ibc_dim_j.writeTo("nessie.starschema.ibc_oms_dim").createOrReplace()

                                                                                

#### With CRM

#### Date

In [261]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Convert timestamp to date only
oms_df_reduced = oms_df_reduced.withColumn("Date", F.to_date("CreatedOn"))

# Group by IBC and Date to calculate frequency of each fault
most_frequent_fault = oms_df_reduced.groupBy("IBC", "Date", "initialoffreason").agg(
    F.count("initialoffreason").alias("Most_Occurred_Fault_Frequency")
)

# Find the most frequent fault per IBC and Date
most_frequent_fault = most_frequent_fault.withColumn(
    "rank", F.row_number().over(
        Window.partitionBy("IBC", "Date").orderBy(F.desc("Most_Occurred_Fault_Frequency"))
    )
).filter(F.col("rank") == 1).drop("rank")

# Aggregate the other fields for each IBC and Date
oms_ibc_dim = oms_df_reduced.groupBy("IBC", "Date").agg(
    F.avg("TAT").alias("Average Fault Turn-Around Time (mins)"),
    F.avg("duration").alias("Average Fault Duration (mins)"),
    F.count("outage_Id").alias("Total Outages/Faults"),
    F.avg("PunchingDelay").alias("Average Punching Delay"),

    F.sum("israintripping").alias("Rain Frequency")
)

# Join both datasets
oms_ibc_dim_j = oms_ibc_dim.join(most_frequent_fault, on=["IBC", "Date"], how="left")

# Final selection
oms_ibc_date_dim = oms_ibc_dim_j.select(
    "IBC",
    "Date",
    "Average Fault Turn-Around Time (mins)",
    "Average Fault Duration (mins)",
    "Total Outages/Faults",
    "Average Punching Delay",
    "Rain Frequency",
    F.col("initialoffreason").alias("Most_Occurred_Fault"),
    "Most_Occurred_Fault_Frequency"
)

# Show result
oms_ibc_date_dim.show(truncate=False)




+----------+----------+-------------------------------------+-----------------------------+--------------------+----------------------+--------------+-------------------+-----------------------------+
|IBC       |Date      |Average Fault Turn-Around Time (mins)|Average Fault Duration (mins)|Total Outages/Faults|Average Punching Delay|Rain Frequency|Most_Occurred_Fault|Most_Occurred_Fault_Frequency|
+----------+----------+-------------------------------------+-----------------------------+--------------------+----------------------+--------------+-------------------+-----------------------------+
|Nazimabad |2024-04-02|49934.0                              |164.55208611488342           |9                   |0.1111111111111111    |0             |Breaker Tripped    |4                            |
|F.B. Area |2024-08-18|80.4                                 |79.8008337020874             |20                  |2.45                  |0             |Job of Operation   |8                         

                                                                                

In [262]:
#Reading crm_date
crm_date_dim = spark.read.table("nessie.starschema.crm_date_dim")

In [257]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col


In [266]:
date_ibc_crm_oms = oms_ibc_date_dim.alias("oms_date").join(
    crm_date_dim.alias("crm_date"), 
    (F.col("oms_date.Date") == F.col("crm_date.Date")) &
    (F.col("oms_date.IBC") == F.col("crm_date.IBCName")),#only ibc names in oms
    how='outer'#retaining all info 


     
).groupBy("oms_date.Date","oms_date.IBC" ).agg(
    # First value of relevant oms and crm columns
    F.first("oms_date.Total Outages/Faults").alias("Total Outages/Faults"),
    F.first("oms_date.Most_Occurred_Fault").alias("Most_Occurred_Fault"),
    F.first("oms_date.Most_Occurred_Fault_Frequency").alias("Most_Occurred_Fault_Frequency"),
    F.first("oms_date.Average Fault Duration (mins)").alias("Average Fault Duration (mins)"),
    F.first("oms_date.Average Fault Turn-Around Time (mins)").alias("Average Fault Turn-Around Time (mins)"),
    F.first("oms_date.Average Punching Delay").alias("Average Punching Delay"),
    F.first("oms_date.Rain Frequency").alias("Rain Frequency"),
    F.first("crm_date.Complaint Counts").alias("Complaint Counts"),    
    F.first("crm_date.Average Complaint Resolution Time (mins)").alias("Average Complaint Resolution Time (mins)")
 
)
    
date_ibc_crm_oms.show(truncate=False)

                                                                                

+----------+----------+--------------------+-------------------+-----------------------------+-----------------------------+-------------------------------------+----------------------+--------------+----------------+----------------------------------------+
|Date      |IBC       |Total Outages/Faults|Most_Occurred_Fault|Most_Occurred_Fault_Frequency|Average Fault Duration (mins)|Average Fault Turn-Around Time (mins)|Average Punching Delay|Rain Frequency|Complaint Counts|Average Complaint Resolution Time (mins)|
+----------+----------+--------------------+-------------------+-----------------------------+-----------------------------+-------------------------------------+----------------------+--------------+----------------+----------------------------------------+
|NULL      |NULL      |5561                |NULL               |NULL                         |NULL                         |NULL                                 |NULL                  |0             |NULL            |NULL  

In [267]:
spark.sql("DROP TABLE nessie.starschema.date_ibc_crm_oms")

DataFrame[]

In [268]:

date_ibc_crm_oms.writeTo("nessie.starschema.date_ibc_crm_oms").createOrReplace()

                                                                                

### Feeder

In [None]:
#TEST ONLY 
oms_df_reduced = spark.read.table("nessie.starschema.oms_fact")

In [276]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Convert timestamp to date only
oms_df_reduced = oms_df_reduced.withColumn("Date", F.to_date("CreatedOn"))

# Group by Feeder ID and Date to calculate frequency of each fault
most_frequent_fault = oms_df_reduced.groupBy("fdr_Id", "Date", "initialoffreason").agg(
    F.count("initialoffreason").alias("Most_Occurred_Fault_Frequency")
)

# Find the most frequent fault per Feeder ID and Date
most_frequent_fault = most_frequent_fault.withColumn(
    "rank", F.row_number().over(
        Window.partitionBy("fdr_Id", "Date").orderBy(F.desc("Most_Occurred_Fault_Frequency"))
    )
).filter(F.col("rank") == 1).drop("rank")

# Aggregate the other fields for each Feeder ID and Date
oms_feeder_dim = oms_df_reduced.groupBy("fdr_Id", "Date").agg(
    F.first("fdr_name").alias("Feeder Name"),
    F.avg("TAT").alias("Average Fault Turn-Around Time (mins)"),
    F.avg("duration").alias("Average Fault Duration (mins)"),
    F.count("outage_Id").alias("Total Outages/Faults"),
    F.avg("PunchingDelay").alias("Average Punching Delay"),
    F.sum("israintripping").alias("Rain Frequency")
)

# Join both datasets
oms_feeder_dim_j = oms_feeder_dim.join(most_frequent_fault, on=["fdr_Id", "Date"], how="left")

# Final selection
oms_feeder_date_dim = oms_feeder_dim_j.select(
    "Date",
    F.col("fdr_Id").alias("Feeder ID"),
    "Feeder Name",
    "Average Fault Turn-Around Time (mins)",
    "Average Fault Duration (mins)",
    "Total Outages/Faults",
    "Average Punching Delay",
    "Rain Frequency",
    F.col("initialoffreason").alias("Most_Occurred_Fault"),
    "Most_Occurred_Fault_Frequency"
)

# Show result
oms_feeder_date_dim.show(truncate=False)


                                                                                

+----------+---------+-----------+-------------------------------------+-----------------------------+--------------------+----------------------+--------------+-------------------+-----------------------------+
|Date      |Feeder ID|Feeder Name|Average Fault Turn-Around Time (mins)|Average Fault Duration (mins)|Total Outages/Faults|Average Punching Delay|Rain Frequency|Most_Occurred_Fault|Most_Occurred_Fault_Frequency|
+----------+---------+-----------+-------------------------------------+-----------------------------+--------------------+----------------------+--------------+-------------------+-----------------------------+
|2023-01-16|102      |EAGLE SQUAD|54.0                                 |51.849998474121094           |1                   |1.0                   |0             |NULL               |1                            |
|2023-02-09|102      |EAGLE SQUAD|106.0                                |104.73332977294922           |1                   |0.0                   |0     

In [277]:
#Reading crm_date
crm_feeder_dim = spark.read.table("nessie.starschema.crm_feeder_dim")

In [280]:
date_feeder_crm_oms = oms_feeder_date_dim.alias("oms_feeder").join(
    crm_feeder_dim.alias("crm_feeder"), 
    (F.col("oms_feeder.Date") == F.col("crm_feeder.Date")) &
    (F.col("oms_feeder.Feeder ID") == F.col("crm_feeder.Feeder ID")),#only ibc names in oms
    how='outer'#retaining all info 


     
).groupBy("oms_feeder.Date","oms_feeder.Feeder ID" ).agg(
    # First value of relevant oms and crm columns
    F.first("oms_feeder.Feeder Name").alias("Feeder Name"),
    F.first("oms_feeder.Total Outages/Faults").alias("Total Outages/Faults"),
    F.first("oms_feeder.Most_Occurred_Fault").alias("Most_Occurred_Fault"),
    F.first("oms_feeder.Most_Occurred_Fault_Frequency").alias("Most_Occurred_Fault_Frequency"),
    F.first("oms_feeder.Average Fault Duration (mins)").alias("Average Fault Duration (mins)"),
    F.first("oms_feeder.Average Fault Turn-Around Time (mins)").alias("Average Fault Turn-Around Time (mins)"),
    F.first("oms_feeder.Average Punching Delay").alias("Average Punching Delay"),
    F.first("oms_feeder.Rain Frequency").alias("Rain Frequency"),
    F.first("crm_feeder.Complaint Counts").alias("Complaint Counts"),    
    F.first("crm_feeder.Average Complaint Resolution Time (mins)").alias("Average Complaint Resolution Time (mins)")
 
)
    
date_feeder_crm_oms.show(truncate=False)


                                                                                

+----------+---------+--------------------------------+--------------------+-------------------+-----------------------------+-----------------------------+-------------------------------------+----------------------+--------------+----------------+----------------------------------------+
|Date      |Feeder ID|Feeder Name                     |Total Outages/Faults|Most_Occurred_Fault|Most_Occurred_Fault_Frequency|Average Fault Duration (mins)|Average Fault Turn-Around Time (mins)|Average Punching Delay|Rain Frequency|Complaint Counts|Average Complaint Resolution Time (mins)|
+----------+---------+--------------------------------+--------------------+-------------------+-----------------------------+-----------------------------+-------------------------------------+----------------------+--------------+----------------+----------------------------------------+
|2023-01-01|109      |EXPO CENTER                     |1                   |NULL               |1                            |1

In [282]:
spark.sql("DROP TABLE nessie.starschema.date_feeder_crm_oms")
date_feeder_crm_oms.writeTo("nessie.starschema.date_feeder_crm_oms").createOrReplace()

                                                                                

#### merging crm ibc with customer IBC

In [6]:
crm_ibc_dim = spark.read.table("nessie.starschema.ibc_crm_dim")
oms_ibc_dim_j = spark.read.table("nessie.starschema.ibc_oms_dim")

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


In [18]:
# so its easier to drop
crm_ibc_dim = crm_ibc_dim \
    .withColumnRenamed("Year", "Year1") \
    .withColumnRenamed("Month", "Month1")


In [20]:
# Perform a full outer join to get rows from both crm_ibc_dim and oms_ibc_dim
# oms is much lesser than crm 

ibc_oms_crm_dim = oms_ibc_dim_j.join(
    crm_ibc_dim,
    on=[oms_ibc_dim_j.IBC==crm_ibc_dim.IBCName,oms_ibc_dim_j.Year==crm_ibc_dim.Year1,oms_ibc_dim_j.Month==crm_ibc_dim.Month1],  # Ensure you're joining on the IBCName column
    how="inner"      # Use "left" because oms has lesser data than crm  and we only want mapping of ibcs that exist in both 
)

In [23]:
# Display the final merged result
ibc_oms_crm_dim.show(truncate=False)

+-----+----+----------+-------------------------------------+-----------------------------+--------------------+--------------+-------------------+-----------------------------+-------+----------------+----------------------------------------+
|Month|Year|IBC       |Average Fault Turn-Around Time (mins)|Average Fault Duration (mins)|Total Outages/Faults|Rain Frequency|Most_Occurred_Fault|Most_Occurred_Fault_Frequency|IBCCode|Complaint Counts|Average Complaint Resolution Time (mins)|
+-----+----+----------+-------------------------------------+-----------------------------+--------------------+--------------+-------------------+-----------------------------+-------+----------------+----------------------------------------+
|8    |2023|Nazimabad |16946.074162679426                   |75.52139364193879            |418                 |0             |Forced Outage      |144                          |136    |2730            |110.82857149376537                      |
|5    |2023|Nazimabad |6

In [22]:
ibc_oms_crm_dim = ibc_oms_crm_dim.drop("Year1", "Month1", "IBCName")


In [24]:
ibc_oms_crm_dim = ibc_oms_crm_dim.repartition(200)

In [25]:
ibc_oms_crm_dim.count()

                                                                                

48

In [26]:
spark.sql("DROP TABLE nessie.starschema.ibc_oms_crm_dim")

DataFrame[]

In [27]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.starschema")
ibc_oms_crm_dim.writeTo("nessie.starschema.ibc_oms_crm_dim").createOrReplace()

                                                                                

In [512]:
#defining ageing columns 
merged_ibc_dim_omscrm = merged_ibc_dim_omscrm.withColumn(
    "Total_IRB_Amount", F.lit(0).cast("double")
).withColumn(
    "Total_IRB_Units", F.lit(0).cast("double")
).withColumn(
    "Average_IRB_Amount", F.lit(0).cast("double")
).withColumn(
    "Average_IRB_Units", F.lit(0).cast("double")
).withColumn(
    "Total_Sanctioned_Load", F.lit(0).cast("double")
).withColumn(
    "Total_Connected_Load", F.lit(0).cast("double")
).withColumn(
    "Average_Sanctioned_Load", F.lit(0).cast("double")
).withColumn(
    "Average_Connected_Load", F.lit(0).cast("double")
).withColumn(
    "Total_Current_Amount", F.lit(0).cast("double")
).withColumn(
    "Average_Current_Amount", F.lit(0).cast("double")
).withColumn(
    "Total_Current_Units", F.lit(0).cast("double")
).withColumn(
    "Average_Current_Units", F.lit(0).cast("double")
)

# should be able to add ageing with
# merged_ibc_dim_ageing = ageing_ibc_dim.join(
#     merged_ibc_dim_omscrm,
#     on=["IBCName"],  # Ensure you're joining on the IBCName column
#     how="left"       # Use "left" join to keep all rows from oms_ibc_dim and match with crm_ibc_dim
# )

# # Display the final merged result
# merged_ibc_dim_omscrm.show(truncate=False)

##ingest it back into ibc_dim
# merged_ibc_dim_omscrm.write.mode("overwrite").saveAsTable("nessie.starschema.ibc_dim")

In [516]:
# Assuming crm_df and oms_df are Spark DataFrames

distinct_ibcs_crm = crm_df_reduced.select("IBCName").distinct()
distinct_ibcs_oms = oms_df_reduced.select("IBC").distinct()

# Show the results
distinct_ibcs_crm.show(), distinct_ibcs_oms.show()


                                                                                

+-------------+
|      IBCName|
+-------------+
|      Defence|
|       Saddar|
|  NEW KARACHI|
|      Liyar-I|
|     Liyar-II|
|      Gulshan|
|         KIMZ|
|        Gadap|
|       Landhi|
|         SIMZ|
|    F.B. Area|
|  N.Nazimabad|
|  Shah Faisal|
|     Orangi-I|
|    Nazimabad|
|      Korangi|
|          PSC|
|      Clifton|
|North Karachi|
|     Jauhar-I|
+-------------+
only showing top 20 rows

+-----------+
|        IBC|
+-----------+
|Liaquatabad|
|    Gulshan|
|  Nazimabad|
|    FB Area|
+-----------+



(None, None)

In [518]:
# add to starschema as ibc_dim
merged_ibc_dim_omscrm.write.mode("overwrite").saveAsTable("nessie.starschema.ibc_dim")
# merged_ibc_dim_omscrm.writeTo("nessie.starschema.ibc_dim").createOrReplace()

                                                                                ]

In [221]:
df_reduced.withColumn("month", F.month("Createdon")).limit(1).show()

+---------+------------+----------+-------------+-------------+----------------+---------------+-------------------+--------+---------+-------+------+--------------+------+----------------+-------+-------+----+-----+---+-------------+------------+-------------+-------------+-------------+--------+-----------+--------------+----------+---------------+-----+-----+-------------+------------------+-------------+-------------+----+----+----+----+----+----+----+----+----+-----+----+
|outage_Id|NetworkLevel|OutageType|outageSubType|htclosingtype|htclosingsubtype|htclosingreason|   initialoffreason|faultstr|grid_name|pt_name|fdr_Id|      fdr_name|dts_Id|        dts_name|cluster|    IBC|load|Relay|TAT|PunchingDelay|OutageStatus|    Createdon|punchCreateAt| punchCloseAt|duration|isemergency|israintripping|cluster_Id|closing_remarks|saifi|saidi|loss_category|consumer_count_fdr|lastupdatedon|planned_sd_Id|_c36|_c37|_c38|_c39|_c40|_c41|_c42|_c43|_c44|month|year|
+---------+------------+----------+-

25/04/16 22:39:27 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: outage_Id, NetworkLevel, OutageType, outageSubType, htclosingtype, htclosingsubtype, htclosingreason, initialoffreason, faultstr, grid_name, pt_name, fdr_Id, fdr_name, dts_Id, dts_name, cluster, IBC, load, Relay, TAT, PunchingDelay, OutageStatus, Createdon, punchCreateAt, punchCloseAt, duration, isemergency, israintripping, cluster_Id, closing_remarks, saifi, saidi, loss_category, consumer_count_fdr, lastupdatedon, planned_sd_Id, , , , , , , , , 
 Schema: outage_Id, NetworkLevel, OutageType, outageSubType, htclosingtype, htclosingsubtype, htclosingreason, initialoffreason, faultstr, grid_name, pt_name, fdr_Id, fdr_name, dts_Id, dts_name, cluster, IBC, load, Relay, TAT, PunchingDelay, OutageStatus, Createdon, punchCreateAt, punchCloseAt, duration, isemergency, israintripping, cluster_Id, closing_remarks, saifi, saidi, loss_category, consumer_count_fdr, lastupdatedon, planned_sd_Id, _c36, _c37, _

In [406]:
#month
from pyspark.sql import functions as F
from pyspark.sql.window import Window  # Window specification for ranking

# Extract month and year from Createdon column
temp_df = df_reduced.withColumn("month", F.month("Createdon")) \
               .withColumn("year", F.year("Createdon"))

# First, calculate the most frequent fault in "initialoffreason" for each "month"
most_frequent_fault_month = temp_df.groupBy("year", "month", "initialoffreason").agg(
    F.count("initialoffreason").alias("Most_Occurred_Fault_Frequency")
)

# Now, for each year-month, we want the fault with the highest count
most_frequent_fault_month = most_frequent_fault_month.withColumn(
    "rank", F.row_number().over(
        Window.partitionBy("year", "month").orderBy(F.desc("Most_Occurred_Fault_Frequency"))
    )
)

# Filter to get only the most frequent fault for each year-month
most_frequent_fault_month = most_frequent_fault_month.filter(F.col("rank") == 1).drop("rank")

most_frequent_fault_month = most_frequent_fault_month.withColumnRenamed("month", "month_Fault") \
                                                   .withColumnRenamed("year", "year_Fault")

# Now, create the dimension (dim) for Month of `Createdon`
oms_dim_month = temp_df.groupBy("year", "month").agg(
    F.first("month").alias("Month_"),
    F.first("year").alias("Year_"),
    F.avg("TAT").alias("Average Turn-Around Time (mins)"),
    F.avg("duration").alias("Average Fault Duration (mins)"),
    F.count("outage_Id").alias("Total Outages/Faults"),
    F.sum("israintripping").alias("Rain Frequency (23-24)")
)

# Join the most frequent fault with the Month-based aggregation
oms_dim_month = oms_dim_month.join(most_frequent_fault_month, 
                       on=[oms_dim_month.Month_ == most_frequent_fault_month.month_Fault, 
                           oms_dim_month.Year_ == most_frequent_fault_month.year_Fault], 
                       how="left")
oms_dim_month = oms_dim_month.select(
    F.col("month_Fault").alias("Month_"), 
    F.col("year_Fault").alias("Year_"),
    "Average Turn-Around Time (mins)", "Average Fault Duration (mins)", 
    "Total Outages/Faults", "Rain Frequency (23-24)", 
     F.col("initialoffreason").alias("Most_Occurred_Fault"), 
    "Most_Occurred_Fault_Frequency"
)

# Display the final result for the Month of Createdon dimension
oms_dim_month.show(truncate=False)




+------+-----+-------------------------------+-----------------------------+--------------------+----------------------+-------------------+-----------------------------+
|Month_|Year_|Average Turn-Around Time (mins)|Average Fault Duration (mins)|Total Outages/Faults|Rain Frequency (23-24)|Most_Occurred_Fault|Most_Occurred_Fault_Frequency|
+------+-----+-------------------------------+-----------------------------+--------------------+----------------------+-------------------+-----------------------------+
|7     |2024 |3018.4881141045958             |330.58424819474504           |1893                |5                     |Job of Operation   |468                          |
|12    |2024 |2401.005395683453              |1079.3308405302034           |1112                |0                     |Forced Outage      |292                          |
|8     |2023 |10472.540659340659             |180.1895697877508            |910                 |5                     |NULL               |217  

                                                                                

In [408]:
oms_dim_month.dtypes

[('Month_', 'int'),
 ('Year_', 'int'),
 ('Average Turn-Around Time (mins)', 'double'),
 ('Average Fault Duration (mins)', 'double'),
 ('Total Outages/Faults', 'bigint'),
 ('Rain Frequency (23-24)', 'bigint'),
 ('Most_Occurred_Fault', 'string'),
 ('Most_Occurred_Fault_Frequency', 'bigint')]

In [389]:
#date
from pyspark.sql import functions as F
from pyspark.sql.window import Window  # Window specification for ranking

# Extract date from Createdon column
temp_df = df_reduced.withColumn("date", F.to_date("Createdon"))

# First, calculate the most frequent fault in "initialoffreason" for each "date"
most_frequent_fault_date = temp_df.groupBy("date", "initialoffreason").agg(
    F.count("initialoffreason").alias("Most_Occurred_Fault_Frequency")
)

# Now, for each date, we want the fault with the highest count
most_frequent_fault_date = most_frequent_fault_date.withColumn(
    "rank", F.row_number().over(
        Window.partitionBy("date").orderBy(F.desc("Most_Occurred_Fault_Frequency"))
    )
)

# Filter to get only the most frequent fault for each date
most_frequent_fault_date = most_frequent_fault_date.filter(F.col("rank") == 1).drop("rank")
most_frequent_fault_date = most_frequent_fault_date.withColumnRenamed("date", "Date_diff") 
# Now, create the dimension (dim) for Date of Createdon
oms_dim_date = temp_df.groupBy("date").agg(
    F.first("date").alias("Date_"),
    F.avg("TAT").alias("Average Turn-Around Time (mins)"),
    F.avg("duration").alias("Average Fault Duration (mins)"),
    F.count("outage_Id").alias("Total Outages/Faults"),
    F.sum("israintripping").alias("Rain Frequency (23-24)")
)

# Join the most frequent fault with the Date-based aggregation
oms_dim_date = oms_dim_date.join(most_frequent_fault_date, 
                       on=[oms_dim_date.Date_ == most_frequent_fault_date.Date_diff], 
                       how="left")


oms_dim_date = oms_dim_date.select(
    F.col("Date_diff").alias("Date"),
    "Average Turn-Around Time (mins)", "Average Fault Duration (mins)", 
    "Total Outages/Faults", "Rain Frequency (23-24)", 
     F.col("initialoffreason").alias("Most_Occurred_Fault"), 
    "Most_Occurred_Fault_Frequency"
)

# Display the final result for the Date of Createdon dimension
oms_dim_date.show(truncate=False)




+----------+-------------------------------+-----------------------------+--------------------+----------------------+-------------------+-----------------------------+
|Date      |Average Turn-Around Time (mins)|Average Fault Duration (mins)|Total Outages/Faults|Rain Frequency (23-24)|Most_Occurred_Fault|Most_Occurred_Fault_Frequency|
+----------+-------------------------------+-----------------------------+--------------------+----------------------+-------------------+-----------------------------+
|2023-06-22|6418.269503546099              |287.53844794780014           |141                 |0                     |Commercial         |54                           |
|2023-11-08|370.27272727272725             |380.2393975691362            |33                  |0                     |Planned Outage     |6                            |
|2024-05-30|8481.203007518798              |265.40641169181237           |133                 |0                     |Commercial         |73               

                                                                                

In [396]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window  # Window specification for ranking

# First, calculate the most frequent fault in "initialoffreason" for each "fdr_Id" and "fdr_name"
most_frequent_fault_fdr = df_reduced.groupBy("fdr_Id", "fdr_name", "initialoffreason").agg(
    F.count("initialoffreason").alias("Most_Occurred_Fault_Frequency")
)

# Now, for each "fdr_Id" and "fdr_name", we want the fault with the highest count
most_frequent_fault_fdr = most_frequent_fault_fdr.withColumn(
    "rank", F.row_number().over(
        Window.partitionBy("fdr_Id", "fdr_name").orderBy(F.desc("Most_Occurred_Fault_Frequency"))
    )
)

# Filter to get only the most frequent fault for each "fdr_Id" and "fdr_name"
most_frequent_fault_fdr = most_frequent_fault_fdr.filter(F.col("rank") == 1).drop("rank")

# Rename `fdr_Id` and `fdr_name` to avoid ambiguity
most_frequent_fault_fdr = most_frequent_fault_fdr.withColumnRenamed("fdr_Id", "FDR_ID_Fault") \
                                                   .withColumnRenamed("fdr_name", "FDR_Name_Fault")

# Now, create the dimension (dim) for "fdr_Id" and "fdr_name"
oms_dim_fdr = df_reduced.groupBy("fdr_Id", "fdr_name").agg(
    F.first("fdr_Id").alias("FDR_ID_"),
    F.first("fdr_name").alias("FDR_Name_"),
    F.avg("TAT").alias("Average Turn-Around Time (mins)"),
    F.avg("duration").alias("Average Fault Duration (mins)"),
    F.count("outage_Id").alias("Total Outages/Faults"),
    F.sum("israintripping").alias("Rain Frequency (23-24)")
)

# Join the most frequent fault with the "fdr_Id" and "fdr_name" aggregation
oms_dim_fdr = oms_dim_fdr.join(most_frequent_fault_fdr, 
                       on=[oms_dim_fdr.FDR_ID_ == most_frequent_fault_fdr.FDR_ID_Fault, 
                           oms_dim_fdr.FDR_Name_ == most_frequent_fault_fdr.FDR_Name_Fault], 
                       how="left")

# Select distinct columns to avoid ambiguity
oms_dim_fdr = oms_dim_fdr.select(
    "FDR_ID", "FDR_Name", 
    "Average Turn-Around Time (mins)", "Average Fault Duration (mins)", 
    "Total Outages/Faults", "Rain Frequency (23-24)", 
     F.col("initialoffreason").alias("Most_Occurred_Fault"), 
    "Most_Occurred_Fault_Frequency"
)

# Display the final result for the "fdr_Id" and "fdr_name" dimension
oms_dim_fdr.show(truncate=False)




+------+-----------------------------+-------------------------------+-----------------------------+--------------------+----------------------+-------------------+-----------------------------+
|FDR_ID|FDR_Name                     |Average Turn-Around Time (mins)|Average Fault Duration (mins)|Total Outages/Faults|Rain Frequency (23-24)|Most_Occurred_Fault|Most_Occurred_Fault_Frequency|
+------+-----------------------------+-------------------------------+-----------------------------+--------------------+----------------------+-------------------+-----------------------------+
|102   |EAGLE SQUAD                  |4564.562256809339              |507.29518566068356           |514                 |5                     |Voltage Fluctuation|134                          |
|103   |PARAS                        |5446.527876631079              |613.5865991824132            |843                 |5                     |Voltage Fluctuation|247                          |
|109   |EXPO CENTER      

                                                                                

In [381]:
# import re
# from pyspark.sql.functions import lit
# csv = "/mnt/HabibData/fault_data.csv"
# # match = re.search(r'_(\d{6})', csv)# Extract month and year from the filename
# # billing_month = match.group(1) if match else "Unknown"

# # Read the CSV file into a DataFrame
# df = spark.read \
#     .option("delimiter", ",") \
#     .option("header", "true") \
#     .option("inferSchema", "true") \
#     .csv(csv)

# # df = df.withColumn("BillingMonth", lit(billing_month))# Add the BillingMonth column

# # Repartition the DataFrame
# df = df.repartition(200)

# # Write to Nessie table
# df.writeTo("nessie.oms.oms_data_raw").createOrReplace()


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/mnt/HabibData/fault_data.csv.

In [54]:
# spark.stop()

In [53]:
# spark.sql("DROP table nessie.ageing.ageing_data_raw").show()
spark.sql("DROP table nessie.ageing.pmt_dim").show()
spark.sql("DROP table nessie.ageing.month_dim").show()
spark.sql("DROP table nessie.ageing.ibc_dim").show()
spark.sql("DROP table nessie.ageing.consumer_dim").show()
spark.sql("DROP table nessie.ageing.ageing_fact").show()
spark.sql("DROP schema nessie.ageing").show()

++
||
++
++

++
||
++
++

++
||
++
++

++
||
++
++

++
||
++
++



Py4JJavaError: An error occurred while calling o56.sql.
: org.apache.iceberg.exceptions.NamespaceNotEmptyException: Namespace 'ageing' is not empty.
	at org.apache.iceberg.nessie.NessieIcebergClient.dropNamespace(NessieIcebergClient.java:337)
	at org.apache.iceberg.nessie.NessieCatalog.dropNamespace(NessieCatalog.java:316)
	at org.apache.iceberg.spark.SparkCatalog.dropNamespace(SparkCatalog.java:533)
	at org.apache.spark.sql.execution.datasources.v2.DropNamespaceExec.run(DropNamespaceExec.scala:42)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.projectnessie.error.NessieReferenceConflictException: Namespace 'ageing' is not empty.
	at org.projectnessie.error.ErrorCode.lambda$asException$1(ErrorCode.java:66)
	at java.base/java.util.Optional.map(Optional.java:265)
	at org.projectnessie.error.ErrorCode.asException(ErrorCode.java:66)
	at org.projectnessie.client.rest.ResponseCheckFilter.checkResponse(ResponseCheckFilter.java:58)
	at org.projectnessie.client.rest.NessieHttpResponseFilter.filter(NessieHttpResponseFilter.java:29)
	at org.projectnessie.client.http.impl.jdk11.JavaRequest.lambda$executeRequest$1(JavaRequest.java:143)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)
	at org.projectnessie.client.http.impl.jdk11.JavaRequest.executeRequest(JavaRequest.java:143)
	at org.projectnessie.client.http.HttpRequest.post(HttpRequest.java:116)
	at org.projectnessie.client.rest.v1.RestV1TreeClient.commitMultipleOperations(RestV1TreeClient.java:204)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.projectnessie.client.rest.v1.RestV1Client$ExceptionRewriter.invoke(RestV1Client.java:84)
	at com.sun.proxy.$Proxy36.commitMultipleOperations(Unknown Source)
	at org.projectnessie.client.rest.v1.HttpCommitMultipleOperations.commit(HttpCommitMultipleOperations.java:34)
	at org.apache.iceberg.nessie.NessieIcebergClient.lambda$commitRetry$14(NessieIcebergClient.java:772)
	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
	at org.apache.iceberg.nessie.NessieIcebergClient.commitRetry(NessieIcebergClient.java:764)
	at org.apache.iceberg.nessie.NessieIcebergClient.commitRetry(NessieIcebergClient.java:748)
	at org.apache.iceberg.nessie.NessieIcebergClient.dropNamespace(NessieIcebergClient.java:322)
	... 46 more


In [None]:
# df.select('Time').limit(1).show()
df.limit(1).show(truncate=False)

In [None]:
# spark.sql("DROP TABLE IF EXISTS nessie.power_report;").show()

In [None]:
spark.sql("CREATE NAMESPACE nessie.feedervoltage;").show()


In [None]:
# spark = SparkSession.builder.config(conf=conf).getOrCreate()
# print("Spark Session Started")
df.writeTo("nessie.feedervoltage.feedervoltage_data_raw").createOrReplace()

In [None]:
spark.read.table("nessie.feedervoltage.feedervoltage_data_raw").show()

### Appending values change some datatypes

In [261]:
table_name = "nessie.starschema.crm_fact"

# List of columns with `INT` type to be altered to `BIGINT`
columns_to_alter = [
    "PMT"
]

# Generate and execute ALTER TABLE commands for each column
for column in columns_to_alter:
    alter_statement = f"""
    ALTER TABLE {table_name} 
    ALTER COLUMN `{column}` TYPE BIGINT
    """
    print(f"Executing: {alter_statement}")
    spark.sql(alter_statement)

# Verify the schema after updates
print("Updated Table Schema:")
spark.sql(f"DESCRIBE {table_name}").show(truncate=False)

Executing: 
    ALTER TABLE nessie.starschema.crm_fact 
    ALTER COLUMN `PMT` TYPE BIGINT
    


25/05/05 03:01:59 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


AnalysisException: [NOT_SUPPORTED_CHANGE_COLUMN] ALTER TABLE ALTER/CHANGE COLUMN is not supported for changing `nessie`.`starschema`.`crm_fact`'s column `PMT` with type "STRING" to `PMT` with type "BIGINT".; line 2 pos 4;
AlterColumn resolvedfieldname(StructField(PMT,StringType,true)), LongType
+- ResolvedTable org.apache.iceberg.spark.SparkCatalog@438000b7, starschema.crm_fact, nessie.starschema.crm_fact, [AccountContract#16147, Ticket No.#16148L, Business_Partner#16149, GUID#16150, IBCName#16151, PROCESS_TYPE#16152, CREATED_BY#16153, CREATED_AT#16154, UDATETIME#16155, LatestStatus#16156, Supervisor#16157, Lineman#16158, MTLNo#16159, CompletedDateTime#16160, ClosedDateTime#16161, InProcessDateTime#16162, OpenDateTime#16163, Medium Of Complaint#16164, IBCCode#16165, CREATED_DATE#16166, Source#16167, Created By#16168, Region#16169, Subject_ID#16170, ... 44 more fields]


In [None]:
csv_file_path = "Billing_part1.csv"

# Read the CSV file into a DataFrame
dfappend = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(csv_file_path)


In [None]:
dfappend.writeTo("nessie.Billing.Billing_data_raw").append()


### Accidentally appended twice, rollback

In [30]:
spark.sql("""
    SELECT *
    FROM nessie.starschema.ibc_oms_ageing_dim_4.snapshots order by `committed_at` desc limit 100
""").limit(100).show(truncate=False)


+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                                                                    |summary                                                                                                                                                                                                                                 

In [31]:
previous_snapshot_id = "4068863865468229985"  # Replace with the ID of the snapshot before the append
spark.sql(f"""
    CALL nessie.system.rollback_to_snapshot(
        'nessie.starschema.ibc_oms_ageing_dim_4', 
        {previous_snapshot_id}
    )
""")


DataFrame[previous_snapshot_id: bigint, current_snapshot_id: bigint]

In [37]:
ls "/mnt/inc_processed"

In [288]:
spark.stop()

## Getting views


In [None]:
ageing = spark.read.table("nessie.ageing.ageing_data_raw")

In [None]:
ageing.count()

In [None]:
df = spark.table("nessie.ageing.ageing_data_raw")

In [None]:
df_filtered = ageing.filter("status LIKE 'ACT'")


In [None]:
df_repartitioned = df_filtered.repartition(400)  # Adjust number of partitions based on your data size


In [None]:
# Repartition the filtered DataFrame for performance optimization
df_repartitioned.cache()
df_repartitioned.limit(1).show()


In [None]:


# Repartition the data to optimize subsequent operations
df = spark.sql("SELECT * FROM nessie.ageing.ageing_data_raw")
df_repartitioned = df.repartition(200)  # Adjust the number of partitions based on your data size

df_repartitioned.show()


In [None]:
spark.sql("CREATE NAMESPACE nessie.ageing_active").show()

In [None]:
spark.sql("""
    DELETE FROM nessie.ageing.ageing_data_raw
    WHERE status NOT LIKE 'ACT'
""")

In [None]:
df_active = df_active.repartition(200)

In [None]:
df_active.writeTo("nessie.ageing_active.ageing_active_data_raw").createOrReplace()

In [None]:
df_active.write.repartition(100) \
    .mode("overwrite") \
    .saveAsTable("nessie.ageing.ageing_active")


In [None]:
# spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Load the data as a DataFrame
df = spark.sql("""
SELECT 
    WEEKOFYEAR(`Last Payment Date`) AS week_number,
    YEAR(`Last Payment Date`) AS `year`,
    SUM(`RegularUnits`) AS total_regular_units,
    SUM(`Last Payment Amount`) AS total_last_payment
FROM nessie.ageing_active__.ageing_active
GROUP BY YEAR(`Last Payment Date`), WEEKOFYEAR(`Last Payment Date`)
ORDER BY YEAR(`Last Payment Date`), WEEKOFYEAR(`Last Payment Date`);
""")

In [None]:
from pyspark.sql.functions import to_timestamp, regexp_replace

df_updated = df.withColumn(
    "Last Payment Date", 
    to_timestamp(
        regexp_replace(
            regexp_replace(
                regexp_replace(
                    regexp_replace(
                        regexp_replace(
                            regexp_replace(
                                regexp_replace(
                                    regexp_replace(
                                        regexp_replace(
                                            regexp_replace(
                                                regexp_replace(
                                                    regexp_replace(
                                                        df['Last Payment Date'], '-Jan-', '-01-'
                                                    ), '-Feb-', '-02-'
                                                ), '-Mar-', '-03-'
                                            ), '-Apr-', '-04-'
                                        ), '-May-', '-05-'
                                    ), '-Jun-', '-06-'
                                ), '-Jul-', '-07-'
                            ), '-Aug-', '-08-'
                        ), '-Sep-', '-09-'
                    ), '-Oct-', '-10-'
                ), '-Nov-', '-11-'
            ), '-Dec-', '-12-'
        ), 'dd-MM-yy'  # Assuming this is the desired format for TO_TIMESTAMP
    )
)


In [None]:
spark.sql("CREATE NAMESPACE nessie.ageing_active__").show()

In [None]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

df_updated.writeTo("nessie.ageing_active__.ageing_active").createOrReplace()

In [None]:
recovery_view.limit(1).show()

In [None]:
# Repartition into more than one partition (e.g., 100 partitions)
recovery_view.repartition(100).write \
    .option("header", "true") \
    .csv("recovery.csv")


In [None]:
# Instead of coalesce(1), try coalescing to a larger number of partitions (e.g., 10 or 20)
df_spark.coalesce(20).write \
    .option("header", "true") \
    .csv("recovrytry.csv")


In [None]:
# Save the DataFrame as a CSV file
recovery_view.write.option("header", "true").csv("recovery_view.csv")
 cbv

In [None]:
# Save the DataFrame as a single CSV file
recovery_view.coalesce(1).write.csv("recovery_view1.csv")

In [None]:
# Save the DataFrame as a single CSV file
df.coalesce(1).write.option("header", "true").csv("weekly_ageing.csv")

In [None]:
df = spark.read \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("recovery_view2.csv/part-00000-fcb2ca61-ccac-4761-bac3-2da2e51a2074-c000.csv")

In [None]:
ls "recovery_view2.csv"

In [None]:
df.limit(1).show()

In [None]:
spark.stop()

#### Deleting old tables

In [175]:
schemas = spark.sql("SHOW TABLES IN nessie").limit(50).show()

+--------------------+--------------------+-----------+
|           namespace|           tableName|isTemporary|
+--------------------+--------------------+-----------+
|EnergyConsumption...|EnergyConsumption...|      false|
|              ageing|     ageing_data_raw|      false|
|        ageing_check|ageing_check_data...|      false|
|      ageing_cleaned|ageing_cleaned_da...|      false|
|             billing|    billing_data_raw|      false|
|                 crm|        crm_data_raw|      false|
|                crm_|       crm__data_raw|      false|
|           crmschema|            crm_fact|      false|
|fact_billing_and_...|fact_billing_and_...|      false|
|fact_billing_and_...|fact_billing_and_...|      false|
|fact_billing_and_...|fact_billing_and_...|      false|
|fact_billing_and_...|fact_billing_and_...|      false|
|fact_billing_and_...|fact_billing_and_...|      false|
|fact_billing_and_...|fact_billing_and_...|      false|
|fact_network_and_...|fact_network_and_...|     

In [257]:
spark.sql("DROP schema if exists nessie.ageing_check PURGE").show()

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'PURGE': extra input 'PURGE'.(line 1, pos 42)

== SQL ==
DROP schema if exists nessie.ageing_check PURGE
------------------------------------------^^^


In [258]:
# Get all tables in all namespaces (schemas)
df = spark.sql("SHOW TABLES IN nessie")
df = df.filter("isTemporary = false")

# Get unique namespaces except 'starschema'
namespaces = df.select("namespace").distinct().rdd.flatMap(lambda x: x).collect()
schemas_to_drop = [ns for ns in namespaces if ns != "starschema"]

# Drop all tables from schemas other than 'starschema'
for schema in schemas_to_drop:
    tables = df.filter(f"namespace = '{schema}'").select("tableName").rdd.flatMap(lambda x: x).collect()
    for table in tables:
        fq_table = f"nessie.{schema}.{table}"
        print(f"Dropping table: {fq_table}")
        spark.sql(f"DROP TABLE IF EXISTS {fq_table}")

# Drop the empty schemas
for schema in schemas_to_drop:
    tables = df.filter(f"namespace = '{schema}'").select("tableName").rdd.flatMap(lambda x: x).collect()
    for table in tables:
        fq_table = f"nessie.{schema}.{table}"
        print(f"Attempting to drop: {fq_table}")
        try:
            spark.sql(f"DROP TABLE IF EXISTS {fq_table}")
        except Exception as e:
            print(f"Failed to drop {fq_table}: {str(e)}")


                                                                                

Dropping table: nessie.ageing_check.ageing_check_data_raw


Py4JJavaError: An error occurred while calling o3595.sql.
: org.apache.iceberg.exceptions.NotFoundException: Location does not exist: s3://warehouse/ageing_check/ageing_check_data_raw_4e9c5b20-7110-4955-a337-92fea4de4988/metadata/00000-dafa3847-38a0-4168-ad85-7f162829e834.metadata.json
	at org.apache.iceberg.aws.s3.S3InputStream.openStream(S3InputStream.java:194)
	at org.apache.iceberg.aws.s3.S3InputStream.positionStream(S3InputStream.java:177)
	at org.apache.iceberg.aws.s3.S3InputStream.read(S3InputStream.java:107)
	at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539)
	at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133)
	at org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256)
	at org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1744)
	at org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1143)
	at org.apache.iceberg.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3809)
	at org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:280)
	at org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:273)
	at org.apache.iceberg.nessie.NessieTableOperations.lambda$doRefresh$1(NessieTableOperations.java:105)
	at org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$1(BaseMetastoreTableOperations.java:208)
	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
	at org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:208)
	at org.apache.iceberg.nessie.NessieTableOperations.doRefresh(NessieTableOperations.java:99)
	at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
	at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
	at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:49)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
	at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1908)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
	at org.apache.iceberg.CachingCatalog.loadTable(CachingCatalog.java:166)
	at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:843)
	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:170)
	at org.apache.spark.sql.connector.catalog.TableCatalog.tableExists(TableCatalog.java:164)
	at org.apache.spark.sql.execution.datasources.v2.DropTableExec.run(DropTableExec.scala:36)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at jdk.internal.reflect.GeneratedMethodAccessor393.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: software.amazon.awssdk.services.s3.model.NoSuchKeyException: The specified key does not exist. (Service: S3, Status Code: 404, Request ID: 183C818FDFF16110, Extended Request ID: dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8)
	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)
	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)
	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)
	at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:38)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
	at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:224)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$0(BaseSyncClientHandler.java:66)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:60)
	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:52)
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:60)
	at software.amazon.awssdk.services.s3.DefaultS3Client.getObject(DefaultS3Client.java:5174)
	at org.apache.iceberg.aws.s3.S3InputStream.openStream(S3InputStream.java:192)
	... 74 more


In [152]:
spark.sql(f"DROP SCHEMA IF EXISTS ageing CASCADE").show()

++
||
++
++



In [134]:
spark.sql("DROP table nessie.ageing_cleaned.ageing_active").show()
# spark.sql("DROP schema nessie.ageing").show()

++
||
++
++



In [139]:
spark.sql("SHOW DATABASES").show()


+---------+
|namespace|
+---------+
|  default|
+---------+



In [137]:
# Step 1: Get all schemas (databases)
all_schemas = [db.name for db in spark.catalog.listDatabases()]

# Step 2: Filter out the one you want to keep
schemas_to_drop = [db for db in all_schemas if db.lower() != "starschema"]

# Step 3: Drop each schema
for schema in all_schemas:
    # spark.sql(f"DROP DATABASE IF EXISTS {schema} CASCADE")
    print(f"Dropped schema: {schema}")


Dropped schema: default


In [138]:
print(all_schemas)

['default']


In [259]:
spark.stop()