In [None]:
import re
import http.client
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import from_json
import datetime
import time

storage_account_key = dbutils.secrets.get("fakeprod", "fakekey")
analytics_pendo_apikey = dbutils.secrets.get("fakeprod", "fakekey") 
pendo_key=analytics_pendo_apikey
spark = SparkSession.builder.appName("Dynamic Nested JSON Flatten Versions").getOrCreate()


def get_pendo_RawData(object_name,APIlist = []):
  """  
  purpose:
    This function gets all the specified pendo api collections into ADLS blob storage.
  parameters:
    object_name to save the data under , API urls as list 
  """
  status = ''
  execution_log = ''
  
  
  try:
    conn = http.client.HTTPSConnection("app.pendo.io")
    payload = ''
    headers = {
      'Content-Type': 'application/json',
      'x-pendo-integration-key': pendo_key
    } 

    for index, value in enumerate(APIlist):      
      conn.request("GET", f"{value}",payload,headers=headers)
      response = conn.getresponse() 
      raw_data = response.read().decode('utf-8')
      json_data = json.loads(raw_data)
      schema = StructType([StructField("json_string", StringType(), True)])      

      # print(json_data)
      # data_type = type(json_data).__name__
      # print(data_type)       
      
      # Create a Spark DataFrame from the data, converting each item to a JSON string.
      if isinstance(json_data,list):
        df = spark.createDataFrame([(json.dumps(item),) for item in json_data], schema) 
      elif isinstance(json_data,dict):        
        json_data_str = json.dumps([json_data])        
        df = spark.createDataFrame([(json_data_str,)],schema)

      spark.conf.set("storage_acocunt_url", storage_account_key)
      df.write.format("delta").mode("append").option("mergeSchema","true").save("storage_account_container_url"+f"{object_name}"+"/")  
      
      # df.write.format("delta").mode("append").option("mergeSchema","true").saveAsTable("pendo_accounts_raw__test")  
      conn.close()
    
    
    status = 'success'
    execution_log += f" No Errors"
    print("Completed loading "+f"{object_name}"+ " to bronze folder")


  except Exception as execution_error:
    status = 'failed'
    execution_log += f"Error: {str(execution_error)}"

  return status, execution_log
  
def get_pendo_RawDataAggregation(object_name,payloads,DataWriteMode, APIlist = []):
  """  
  purpose:
    This function gets all the specified pendo api collections into ADLS blob storage.
  parameters:
    object_name to save the data under , API urls as list 
  """
  status = ''
  execution_log = ''
  
  
  try:
    conn = http.client.HTTPSConnection("app.pendo.io")    
    headers = {
      'Content-Type': 'application/json',
      'x-pendo-integration-key': pendo_key
    } 

    for index, value in enumerate(APIlist):      
      conn.request("POST", f"{value}", payloads,headers)
      response = conn.getresponse() 
      raw_data = response.read().decode('utf-8')
      json_data = json.loads(raw_data)
      json_data = json_data.get("results",[])
      schema = StructType([StructField("json_string", StringType(), True)])      

      # print(json_data)
      # data_type = type(json_data).__name__
      # print(data_type)       
      
      # Create a Spark DataFrame from the data, converting each item to a JSON string.
      if isinstance(json_data,list):
        df = spark.createDataFrame([(json.dumps(item),) for item in json_data], schema) 
      elif isinstance(json_data,dict):        
        json_data_str = json.dumps([json_data])        
        df = spark.createDataFrame([(json_data_str,)],schema)

      # Correctly specifying the catalog and database
      spark.sql("USE CATALOG silver_prod_fr")

      spark.conf.set("storage_acocunt_url", storage_account_key)
      df.write.format("delta").mode(f"{DataWriteMode}").option("mergeSchema","true").save("storage_account_container_url"+f"{object_name}"+"/")

      # df.show()
      # df.write.format("delta").mode("append").option("mergeSchema","true").saveAsTable("pendoTEST_"+f"{object_name}")  
      conn.close()

    
    status = 'success'
    execution_log += f" No Errors"
    print("Completed loading "+f"{object_name}"+ " to bronze folder")


  except Exception as execution_error:
    status = 'failed'
    execution_log += f"Error: {str(execution_error)}"

  return status, execution_log

In [None]:

#Provide the start date and end date before the run
starttime = datetime.datetime(2023,1,31)
delta = datetime.timedelta(days=10)
endtime = starttime+delta

enddate = datetime.datetime(2024,4,22)

Data_WriteMode = "append" # overwrite

while(endtime<=enddate):
  AggregationData_StartDate = str(int(starttime.timestamp()*1000)+10000)
  AggregationData_EndDate = str(int(endtime.timestamp()*1000))
  print("StartTime: "+ f"{starttime}" + "  StartTime_epoch: "+ AggregationData_StartDate)
  print("EndTime:   "+ f"{endtime}" + "  EndTime_epoch:   "+ AggregationData_EndDate)
  print("######################################################")
  starttime+=delta
  endtime+=delta  

 
  conn = http.client.HTTPSConnection("app.pendo.io")
  payload = ''
  headers = {
    'Content-Type': 'application/json',
    'x-pendo-integration-key': pendo_key
  }


  # """
  #  Getting the list of all Account IDs using a report.
  # """
  # conn.request("GET", "/api/v1/report/accounts_report_url/results.json", payload, headers)
  # accounts_response = conn.getresponse().read().decode("utf-8")
  # accounts_json = json.loads(accounts_response)
  # GETAccountsList = []
  # for item in accounts_json:
  #   account_id = item['accountId']
  #   GETAccountsList.append("/api/v1/account/"+account_id)
  # conn.close()



  # """
  #  Getting the list of all Visitor IDs using a report.
  # """
  # conn.request("GET", "/api/v1/report/visitors_report_url/results.json", payload, headers)
  # visitors_reponse = conn.getresponse().read().decode("utf-8")
  # visitors_json = json.loads(visitors_reponse)
  # GETVisitorsList = []
  # GETVisitorsHistoryList = []
  # for item in visitors_json:
  #   visitor_id = item['visitorId']
  #   GETVisitorsList.append("/api/v1/visitor/"+visitor_id)
  #   GETVisitorsHistoryList.append("/api/v1/visitor/"+visitor_id+"/history?starttime="+AggregationData_StartDate)
  # conn.close()



  # Events payload aggregation = week
  Events_payload = json.dumps({
    "response": {
      "mimeType": "application/json"
    },
    "request": {
      "pipeline": [
        {
          "source": {
            "events": None,
            "timeSeries": {
              "period": "weekRange",
              "first": f"{AggregationData_StartDate}",
              "last":f"{AggregationData_EndDate}"
            }
          }
        }
      ]
    }
  })

  # PageEvents payload aggregation = week
  PageEvents_payload = json.dumps({
    "response": {
      "mimeType": "application/json"
    },
    "request": {
      "pipeline": [
        {
          "source": {
            "pageEvents": None,
            "timeSeries": {
              "period": "weekRange",
              "first": f"{AggregationData_StartDate}",
              "last":f"{AggregationData_EndDate}"
            }
          }
        }
      ]
    }
  })

  # FeatureEvents payload aggregation = week
  FeatureEvents_payload = json.dumps({
    "response": {
      "mimeType": "application/json"
    },
    "request": {
      "pipeline": [
        {
          "source": {
            "featureEvents": None,
            "timeSeries": {
              "period": "weekRange",
              "first": f"{AggregationData_StartDate}",
              "last":f"{AggregationData_EndDate}"
            }        
          }
        }
      ]
    }
  })

  # guideEvents payload aggregation = week
  GuideEvents_payload = json.dumps({
    "response": {
      "mimeType": "application/json"
    },
    "request": {
      "pipeline": [
        {
          "source": {
            "guideEvents": None,
            "timeSeries": {
              "period": "weekRange",
              "first": f"{AggregationData_StartDate}",
              "last":f"{AggregationData_EndDate}"
            }        
          }
        }
      ]
    }
  })

  # TrackEvents payload aggregation = week
  TrackEvents_payload = json.dumps({
    "response": {
      "mimeType": "application/json"
    },
    "request": {
      "pipeline": [
        {
          "source": {
            "trackEvents": None,
            "timeSeries": {
              "period": "weekRange",
              "first": f"{AggregationData_StartDate}",
              "last":f"{AggregationData_EndDate}"
            }        
          }
        }
      ]
    }
  })

  # PollEvents payload aggregation = week
  PollEvents_payload = json.dumps({
    "response": {
      "mimeType": "application/json"
    },
    "request": {
      "pipeline": [
        {
          "source": {
            "pollEvents": None,
            "timeSeries": {
              "period": "weekRange",
              "first": f"{AggregationData_StartDate}",
              "last":f"{AggregationData_EndDate}"
            }        
          }
        }
      ]
    }
  })


  # Features payload aggregation data
  Features_payload = json.dumps({
    "response": {
      "mimeType": "application/json"
    },
    "request": {
      "name": "features source",
      "pipeline": [
        {
          "source": {
            "features": None
          }
        },
        {
          "filter":"createdAt>="+f"{AggregationData_StartDate}"+" || lastUpdatedAt>="+f"{AggregationData_StartDate}"
        }
      ]
    }
  })

  # Pages payload aggregation data
  Pages_payload = json.dumps({
    "response": {
      "mimeType": "application/json"
    },
    "request": {
      "name": "pages source",
      "pipeline": [
        {
          "source": {
            "pages": None
          }
        },
        {
          "filter":"createdAt>="+f"{AggregationData_StartDate}"+" || lastUpdatedAt>="+f"{AggregationData_StartDate}"
        }
      ]
    }
  })

  # guides payload aggregation data
  Guides_payload = json.dumps({
    "response": {
      "mimeType": "application/json"
    },
    "request": {
      "name": "guides source",
      "pipeline": [
        {
          "source": {
            "guides": None
          }
        },
        {
          "filter":"createdAt>="+f"{AggregationData_StartDate}"+" || lastUpdatedAt>="+f"{AggregationData_StartDate}"
        }
      ]
    }
  })

  # Accounts payload aggregation data
  Accounts_payload = json.dumps({
    "response": {
      "mimeType": "application/json"
    },
    "request": {
      "name": "accounts source",
      "pipeline": [
        {
          "source": {
            "accounts": None 
          }
        },
        {
          "filter":"metadata.auto.firstvisit>="+f"{AggregationData_StartDate}" + " || metadata.auto.lastUpdated>="+f"{AggregationData_StartDate}"
        }
      ]
    }
  })

  # Visitors payload aggregation data
  Visitors_payload = json.dumps({
    "response": {
        "mimeType": "application/json"
    },
    "request": {
        "name": "visitors source",
        "pipeline": [
            {
              "source": {
                  "visitors": None
              }
            },
            {
              "filter":"metadata.auto.firstvisit>="+f"{AggregationData_StartDate}" + " || metadata.auto.lastUpdated>="+f"{AggregationData_StartDate}"
            }
        ]
    }
  })



  GETFeatures = ['/api/v1/feature?expand=*']
  GETPages = ['/api/v1/page?expand=*']
  GETGuides = ['/api/v1/guide?expand=*']
  GETMetadataVisitor = ['/api/v1/metadata/schema/visitor']
  GETMetadataAccount = ['/api/v1/metadata/schema/account']
  GETAggregation = ['/api/v1/aggregation']


  # get_pendo_RawData('VisitorMetadata',GETMetadataVisitor)
  # get_pendo_RawData('AcconutMetadata',GETMetadataAccount)
  # get_pendo_RawData('PendoRaw_VisitorsHistory', GETVisitorsHistoryList[:3])
  # get_pendo_RawData('Dependencies', GETMetadataDependencies)
  # get_pendo_RawData('TrackTypes','/api/v1/tracktype') # No Track Events data for Aspire
  # get_pendo_RawData('Accounts',GETAccountsList)
  # get_pendo_RawData('Visitors',GETVisitorsList)


  get_pendo_RawData('Features',GETFeatures)
  get_pendo_RawData('Pages',GETPages)
  get_pendo_RawData('Guides',GETGuides)

  get_pendo_RawDataAggregation('FeaturesAggregation',Features_payload, Data_WriteMode, GETAggregation)
  get_pendo_RawDataAggregation('PagesAggregation',Pages_payload, Data_WriteMode, GETAggregation)
  get_pendo_RawDataAggregation('GuidesAggregation',Guides_payload, Data_WriteMode, GETAggregation)
  get_pendo_RawDataAggregation('AccountsAggregation',Accounts_payload, Data_WriteMode, GETAggregation)
  get_pendo_RawDataAggregation('VisitorsAggregation',Visitors_payload, Data_WriteMode, GETAggregation)

  get_pendo_RawDataAggregation('EventsFinal',Events_payload, Data_WriteMode, GETAggregation) 
  get_pendo_RawDataAggregation('PageEventsFinal',PageEvents_payload, Data_WriteMode, GETAggregation)
  get_pendo_RawDataAggregation('FeatureEventsFinal',FeatureEvents_payload, Data_WriteMode, GETAggregation)
  get_pendo_RawDataAggregation('GuideEventsFinal',GuideEvents_payload, Data_WriteMode, GETAggregation)
  get_pendo_RawDataAggregation('TrackEventsFinal',TrackEvents_payload, Data_WriteMode, GETAggregation)
  get_pendo_RawDataAggregation('PollEventsFinal',PollEvents_payload, Data_WriteMode, GETAggregation)


In [None]:
from pyspark.sql import Row
import time 

Data_WriteMode = "append" # overwrite

# Correctly specifying the catalog and database
spark.sql("USE CATALOG silver_prod_fr")

# Create metadata schema
metadata_schema = """
    object_name STRING,
    record_timestamp DATE
"""

# Create the metadata table if it doesn't exist
spark.sql(f"CREATE TABLE IF NOT EXISTS pendo.events_metadata ({metadata_schema}) USING DELTA")

def is_data_loaded(object_name, record_timestamp):
    # Check if data with given timestamp is already loaded.
    # Connect to the metadata table and check if data exists for the given object and timestamp
    result = spark.sql(f"""
        SELECT COUNT(*)
        FROM pendo.events_metadata
        WHERE object_name = '{object_name}'
        AND record_timestamp = '{record_timestamp}'
    """).collect()

    # If count > 0, data already loaded
    return result[0][0] > 0
  
def insert_into_metadata(object_name, record_timestamp):
    # Insert a record into the metadata table.
    spark.conf.set("fs.azure.account.key.greenindustrydeltalake.dfs.core.windows.net", storage_account_key)
    metadata_df = spark.createDataFrame([Row(object_name=object_name, record_timestamp=record_timestamp)], schema=metadata_schema)
    metadata_df.write.format("delta").mode("append").option("mergeSchema","true").saveAsTable("pendo.events_metadata")

current_time_seconds = time.time()
current_time_ms = int(current_time_seconds * 1000)
for day in range(821): 
    GETAggregation = ['/api/v1/aggregation']
    start_of_day_ms = current_time_ms - (day * 24 * 60 * 60 * 1000)
    start_of_day_date = datetime.datetime.fromtimestamp(start_of_day_ms / 1000).date()

    if not is_data_loaded('Events', start_of_day_date):
        Events_payload = json.dumps({
        "response": {
            "mimeType": "application/json"
        },
        "request": {
            "pipeline": [
            {
                "source": {
                "events": None,
                "timeSeries": 
                    {
                    "period": "dayRange",
                    "first": start_of_day_ms,
                    "count": 1
                }
                }
            }
            ]
        }
        })
        get_pendo_RawDataAggregation('Events2',Events_payload, Data_WriteMode, GETAggregation) 
        insert_into_metadata('Events', start_of_day_date)
        print("Events data loaded for day:", day)
    

    elif not is_data_loaded('PageEvents', start_of_day_date):
        PageEvents_payload = json.dumps({
        "response": {
            "mimeType": "application/json"
        },
        "request": {
            "pipeline": [
            {
                "source": {
                "pageEvents": None,
                "timeSeries": 
                    {
                    "period": "dayRange",
                    "first": start_of_day_ms,
                    "count": 1
                }
                }
            }
            ]
        }
        })
        get_pendo_RawDataAggregation('PageEvents2',PageEvents_payload, Data_WriteMode, GETAggregation)
        insert_into_metadata('PageEvents', start_of_day_date)
        print("PageEvents data loaded for day:", day)

    elif not is_data_loaded('FeatureEvents', start_of_day_date):
        FeatureEvents_payload = json.dumps({
        "response": {
                "mimeType": "application/json"
            },
            "request": {
                "pipeline": [
                    {
                        "source": {
                            "featureEvents": None,
                            "timeSeries": 
                            {
                            "period": "dayRange",
                            "first": start_of_day_ms,
                            "count": 1
                            }
                        }
                    }
                ]
            }
        })
        get_pendo_RawDataAggregation('FeatureEvents2',FeatureEvents_payload, Data_WriteMode, GETAggregation)
        insert_into_metadata('FeatureEvents', start_of_day_date)
        print("FeatureEvents data loaded for day:", day)
    
    elif not is_data_loaded('TrackEvents', start_of_day_date):
        TrackEvents_payload = json.dumps({
        "response": {
                "mimeType": "application/json"
            },
            "request": {
                "pipeline": [
                    {
                        "source": {
                            "trackEvents": None,
                            "timeSeries": 
                            {
                            "period": "dayRange",
                            "first": start_of_day_ms,
                                "count": 1
                            }
                        }
                    }
                ]
            }
        })
        get_pendo_RawDataAggregation('TrackEvents2',TrackEvents_payload, Data_WriteMode, GETAggregation)
        insert_into_metadata('TrackEvents', start_of_day_date)
        print("TrackEvents data loaded for day:", day)

    elif not is_data_loaded('PollEvents', start_of_day_date):
        PollEvents_payload = json.dumps({
        "response": {
                "mimeType": "application/json"
            },
            "request": {
                "pipeline": [
                    {
                        "source": {
                            "pollEvents": None,
                            "timeSeries": 
                            {
                            "period": "dayRange",
                            "first": start_of_day_ms,
                            "count": 1
                            }
                        }
                    }
                ]
            }
        })
        get_pendo_RawDataAggregation('PollEvents2',PollEvents_payload, Data_WriteMode, GETAggregation)
        insert_into_metadata('PollEvents', start_of_day_date)
        print("PollEvents data loaded for day:", day)

    elif not is_data_loaded('GuideEvents', start_of_day_date):
        GuideEvents_payload = json.dumps({
        "response": {
                "mimeType": "application/json"
            },
            "request": {
                "pipeline": [
                    {
                        "source": {
                            "guideEvents": None,
                            "timeSeries": 
                            {
                            "period": "dayRange",
                            "first": start_of_day_ms,
                            "count": 1
                            }
                        }
                    }
                ]
            }
        })
        get_pendo_RawDataAggregation('GuideEvents2',GuideEvents_payload, Data_WriteMode, GETAggregation)
        insert_into_metadata('GuideEvents', start_of_day_date)
        print("GuideEvents data loaded for day:", day)
    else:
        print("All days have been loaded for all events")

In [None]:
# Today_date = spark.sql("Select date_format(current_date(),'yyyyMMdd')").collect()[0][0] 

# Visitors_data = spark.read.format("delta").load("url_to_visitors_data_in_storage_account")
# # Show the content of the DataFrame
# Visitors_data.show()  
# Visitors_data.write.format("delta").mode("append").option("mergeSchema","true").saveAsTable("pendo_all_visitors_test")
# print("Visitors data read complete")

# Accounts_data = spark.read.format("delta").load("url_to_accounts_data_in_storage_account")
# # Show the content of the DataFrame
# Accounts_data.show()  
# Accounts_data.write.format("delta").mode("append").option("mergeSchema","true").saveAsTable("pendo_all_accounts_test")
# print("Accounts data read complete")

# Pages_data = spark.read.format("delta").load("url_to_pages_data_in_storage_account")
# # # Show the content of the DataFrame
# # Pages_data.show()  
# Pages_data.write.format("delta").mode("append").option("mergeSchema","true").saveAsTable("pendo_all_pages_data_test")
# print("Pages data read complete")

# Features_data = spark.read.format("delta").load("url_to_features_data_in_storage_account")
# # # Show the content of the DataFrame
# # Features_data.show()  
# Features_data.write.format("delta").mode("append").option("mergeSchema","true").saveAsTable("pendo_all_features_test")
# print("Features data read complete")

# Dependencies_data = spark.read.format("delta").load("url_to_dependencies_data_in_storage_account")
# # # Show the content of the DataFrame
# # Dependencies_data.show()  
# Dependencies_data.write.format("delta").mode("append").option("mergeSchema","true").saveAsTable("pendo_all_Dependencies_test")
# print("Dependencies data read complete")

