In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *

# get file paths with dates available
files = dbutils.fs.ls("abfss://passthrough@raicnprdpassthrough.dfs.core.windows.net/source/daily_refresh")
dates = [file[1][:-1] for file in files]

remove_dates =[]
for date in dates:
  try:
    ft = dbutils.fs.ls(f"abfss://passthrough@raicnprdpassthrough.dfs.core.windows.net/source/daily_refresh/{date}/features_extracted")
  except Exception as e:
    #print(date)    
    remove_dates.append(date)
dates = [dt for dt in dates if dt not in remove_dates]

# add check condition on historical trusted edges date to see if already updated
try:
  historical_robocalling = (spark
     .read
     .format('delta')
     .load("abfss://famli-dev-workspace@famlisandbox.dfs.core.windows.net/eh163e/data/robocalling/historical_robo_features"))
  historical_dates = historical_robocalling.select("data_dt").distinct().collect()
except Exception as e:
    #print(e)
    historical_dates = []

if len(historical_dates) > 0:
  historical_dates1 = [row['data_dt'] for row in historical_dates]

dates_notin = [dt for dt in dates if (dt not in historical_dates1) & (dt[0]=="2")]
print("Dates data not present")
print(dates_notin)

# iterate through the files and append them to an aggregated delta table for all truted relationships
if len(dates_notin) > 0:
  for date in dates_notin:
    try:
      df = spark.read.format("csv").option("header",True).load(f"abfss://passthrough@raicnprdpassthrough.dfs.core.windows.net/source/daily_refresh/{date}/features_extracted").withColumn("data_dt",lit(str(date)))
      df.write.option("header",True).partitionBy("data_dt").format("delta").option("mergeSchema", "true").mode("append").save("abfss://famli-dev-workspace@famlisandbox.dfs.core.windows.net/eh163e/data/robocalling/historical_robo_features")
    except Exception as e:
      print(e)   

In [0]:
df = spark.read.format("delta").load("abfss://famli-dev-workspace@famlisandbox.dfs.core.windows.net/eh163e/data/robocalling/historical_robo_features")
df.createOrReplaceTempView("df")
df_count = spark.sql('''with 
              data as (select
              data_dt as Date,
              count(data_dt) as Record_Count
              from df
              group by data_dt)
              select
              Date,
              Record_Count,
              sum(Record_Count) over (order by Date asc rows between unbounded preceding and current row) as Cummulative_Count
              from data''')
       
display(df_count)

Date,Record_Count,Cummulative_Count
20210101,215400848,215400848
20210102,168176856,383577704
20210103,159329473,542907177
20210104,206148618,749055795
20210105,208479945,957535740
20210106,210877696,1168413436
20210107,210450020,1378863456
20210108,209132803,1587996259
20210109,177089773,1765086032
20210110,162445377,1927531409


In [0]:
# import pandas as pd

# min_dt = min(historical_dates1) # get lowest date
# max_dt = max(historical_dates1) # get highest date

# dt_range = pd.date_range(min_dt, max_dt) # get all requisite dates in range
# missing_dts = [d for d in dt_range if d not in historical_dates1] # list missing
# print("There are {n} missing dates".format(n=len(missing_dts)))
